PK!AEmolten/__init__.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from .app import App, BaseApp from .common import annotate from .dependency_injection import Component, DependencyInjector, DependencyResolver from .errors import ( DIError, FieldTooLarge, FieldValidationError, FileTooLarge, HeaderMissing, HTTPError, MoltenError, ParamMissing, ParseError, RequestHandled, RequestParserNotAvailable, RouteNotFound, RouteParamMissing, TooManyFields, ValidationError ) from .helpers import RedirectType, redirect from .http import ( Cookie, Cookies, Headers, QueryParams, Request, Response, StreamingResponse, UploadedFile ) from .http.status_codes import * from .middleware import ResponseRendererMiddleware from .parsers import JSONParser, MultiPartParser, RequestParser, URLEncodingParser from .renderers import JSONRenderer, ResponseRenderer from .router import Include, Route, Router from .settings import Settings, SettingsComponent from .testing import TestClient, TestResponse, to_environ from .typing import ( Environ, Header, Host, Method, Middleware, Port, QueryParam, QueryString, RequestBody, RequestData, RequestInput, Scheme, StartResponse ) from .validation import ( Field, Missing, Validator, dump_schema, field, is_schema, load_schema, schema ) __version__ = "0.5.2" __all__ = [ "BaseApp", "App", "Middleware", "annotate", # Settings "Settings", "SettingsComponent", # Router "Router", "Route", "Include", # WSGI "Environ", "StartResponse", # HTTP "Method", "Scheme", "Host", "Port", "QueryString", "QueryParams", "QueryParam", "Headers", "Header", "RequestInput", "RequestBody", "RequestData", "Cookies", "Cookie", "UploadedFile", "Request", "Response", "StreamingResponse", # Dependency-injection "DependencyInjector", "DependencyResolver", "Component", # Parsers "RequestParser", "JSONParser", "URLEncodingParser", "MultiPartParser", # Renderers "ResponseRenderer", "JSONRenderer", # Middleware "ResponseRendererMiddleware", # Validation "Field", "Missing", "Validator", "field", "schema", "is_schema", "dump_schema", "load_schema", # Helpers "RedirectType", "redirect", # Errors "MoltenError", "DIError", "HTTPError", "RouteNotFound", "RouteParamMissing", "RequestHandled", "RequestParserNotAvailable", "ParseError", "FieldTooLarge", "FileTooLarge", "TooManyFields", "HeaderMissing", "ParamMissing", "ValidationError", "FieldValidationError", # Testing "TestClient", "TestResponse", "to_environ", # Status codes # 1xx "HTTP_100", "HTTP_101", "HTTP_102", # 2xx "HTTP_200", "HTTP_201", "HTTP_202", "HTTP_203", "HTTP_204", "HTTP_205", "HTTP_206", "HTTP_207", "HTTP_208", # 3xx "HTTP_300", "HTTP_301", "HTTP_302", "HTTP_303", "HTTP_304", "HTTP_305", "HTTP_307", "HTTP_308", # 4xx "HTTP_400", "HTTP_401", "HTTP_402", "HTTP_403", "HTTP_404", "HTTP_405", "HTTP_406", "HTTP_407", "HTTP_408", "HTTP_409", "HTTP_410", "HTTP_411", "HTTP_412", "HTTP_413", "HTTP_414", "HTTP_415", "HTTP_416", "HTTP_417", "HTTP_418", "HTTP_421", "HTTP_422", "HTTP_423", "HTTP_424", "HTTP_426", "HTTP_428", "HTTP_429", "HTTP_431", "HTTP_444", "HTTP_451", "HTTP_499", # 5xx "HTTP_500", "HTTP_501", "HTTP_502", "HTTP_503", "HTTP_504", "HTTP_505", "HTTP_506", "HTTP_507", "HTTP_508", "HTTP_510", "HTTP_511", "HTTP_599", ] PK!("" molten/app.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import logging import sys from typing import Any, Callable, Iterable, List, Optional from wsgiref.util import FileWrapper # type: ignore from .components import ( CookiesComponent, HeaderComponent, QueryParamComponent, RequestBodyComponent, RequestDataComponent, RouteComponent, RouteParamsComponent, SchemaComponent ) from .dependency_injection import Component, DependencyInjector from .errors import ParseError, RequestHandled, RequestParserNotAvailable from .http import ( HTTP_204, HTTP_400, HTTP_404, HTTP_415, HTTP_500, Headers, QueryParams, Request, Response ) from .parsers import JSONParser, MultiPartParser, RequestParser, URLEncodingParser from .renderers import JSONRenderer, ResponseRenderer from .router import RouteLike, Router from .typing import ( Environ, Host, Method, Middleware, Port, QueryString, RequestInput, Scheme, StartResponse ) try: from gunicorn.http.errors import NoMoreData except ImportError: # pragma: no cover pass LOGGER = logging.getLogger(__name__) class BaseApp: """Base class for App implementations. Parameters: routes: An optional list of routes to register with the router. middleware: An optional list of middleware. If provided, this replaces the default set of middleware, including the response renderer so make sure to include that in your middleware list. parsers: An optional list of request parsers to use. If provided, this replaces the default list of request parsers. renderers: An optional list of response renderers. If provided, this replaces the default list of response renderers. """ def __init__( self, routes: Optional[List[RouteLike]] = None, middleware: Optional[List[Middleware]] = None, components: Optional[List[Component[Any]]] = None, parsers: Optional[List[RequestParser]] = None, renderers: Optional[List[ResponseRenderer]] = None, ) -> None: # ResponseRendererMiddleware needs to be able to look up # middleware off of the app which leads to an import cycle # since the two are dependant on one another. from .middleware import ResponseRendererMiddleware self.router = Router(routes) self.add_route = self.router.add_route self.add_routes = self.router.add_routes self.reverse_uri = self.router.reverse_uri self.parsers = parsers or [ JSONParser(), URLEncodingParser(), MultiPartParser(), ] self.renderers = renderers or [ JSONRenderer(), ] self.middleware = middleware or [ ResponseRendererMiddleware() ] self.components = (components or []) + [ HeaderComponent(), CookiesComponent(), QueryParamComponent(), RequestBodyComponent(), RequestDataComponent(self.parsers), SchemaComponent(), ] self.injector = DependencyInjector( components=self.components, singletons={BaseApp: self}, # type: ignore ) def handle_404(self) -> Response: """Called whenever a route cannot be found. Dependencies are injected into this just like a normal handler. """ return Response(HTTP_404, content="Not Found") def handle_415(self) -> Response: """Called whenever a request comes in with an unsupported content type. Dependencies are injected into this just like a normal handler. """ return Response(HTTP_415, content="Unsupported Media Type") def handle_parse_error(self, exception: ParseError) -> Response: """Called whenever a request comes in with a payload that fails to parse. Dependencies are injected into this just like a normal handler. Parameters: exception: The ParseError that was raised by the request parser on failure. """ LOGGER.warning("Request cannot be parsed: %s", exception) return Response(HTTP_400, content=f"Request cannot be parsed: {exception}") def handle_exception(self, exception: BaseException) -> Response: """Called whenever an unhandled exception occurs in middleware or a handler. Dependencies are injected into this just like a normal handler. Parameters: exception: The exception that occurred. """ LOGGER.exception("An unhandled exception occurred.") return Response(HTTP_500, content="Internal Server Error") def __call__(self, environ: Environ, start_response: StartResponse) -> Iterable[bytes]: # pragma: no cover raise NotImplementedError("apps must implement '__call__'") class App(BaseApp): """An application that implements the WSGI interface. Parameters: routes: An optional list of routes to register with the router. middleware: An optional list of middleware. If provided, this replaces the default set of middleware, including the response renderer so make sure to include that in your middleware list. parsers: An optional list of request parsers to use. If provided, this replaces the default list of request parsers. renderers: An optional list of response renderers. If provided, this replaces the default list of response renderers. """ def __call__(self, environ: Environ, start_response: StartResponse) -> Iterable[bytes]: request = Request.from_environ(environ) resolver = self.injector.get_resolver({ Environ: environ, Headers: request.headers, Host: Host(request.host), Method: Method(request.method), Port: Port(request.port), QueryParams: request.params, QueryString: QueryString(environ.get("QUERY_STRING", "")), Request: request, RequestInput: RequestInput(request.body_file), Scheme: Scheme(request.scheme), StartResponse: start_response, }) try: handler: Callable[..., Any] route_and_params = self.router.match(request.method, request.path) if route_and_params is not None: route, params = route_and_params handler = route.handler resolver.add_component(RouteComponent(route)) resolver.add_component(RouteParamsComponent(params)) else: handler = self.handle_404 resolver.add_component(RouteComponent(None)) handler = resolver.resolve(handler) for middleware in reversed(self.middleware): handler = resolver.resolve(middleware(handler)) exc_info = None response = handler() except RequestHandled: # This is used to break out of gunicorn's keep-alive loop. # If we don't do this, then gunicorn might attempt to read # from a closed socket. raise NoMoreData() except RequestParserNotAvailable: exc_info = None response = resolver.resolve(self.handle_415)() except ParseError as e: exc_info = None response = resolver.resolve(self.handle_parse_error)(exception=e) except Exception as e: exc_info = sys.exc_info() response = resolver.resolve(self.handle_exception)(exception=e) content_length = response.get_content_length() if content_length is not None: response.headers.add("content-length", str(content_length)) start_response(response.status, list(response.headers), exc_info) if response.status != HTTP_204: wrapper = environ.get("wsgi.file_wrapper", FileWrapper) return wrapper(response.stream) else: return [] PK!B molten/common.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from collections import defaultdict from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, TypeVar, Union KT = TypeVar("KT") VT = TypeVar("VT") Mapping = Union[ Dict[KT, Union[VT, List[VT]]], Iterable[Tuple[KT, Union[VT, List[VT]]]] ] class MultiDict(Iterable[Tuple[KT, VT]]): """A mapping from param names to lists of values. Once constructed, these instances cannot be modified. """ __slots__ = ["_data"] def __init__(self, mapping: Optional[Mapping[KT, VT]] = None) -> None: self._data: Dict[KT, List[VT]] = defaultdict(list) self._add_all(mapping or {}) def _add(self, name: KT, value: Union[VT, List[VT]]) -> None: """Add values for a particular key. """ if isinstance(value, list): self._data[name].extend(value) else: self._data[name].append(value) def _add_all(self, mapping: Mapping[KT, VT]) -> None: """Add a group of values. """ items: Iterable[Tuple[KT, Union[VT, List[VT]]]] if isinstance(mapping, dict): items = mapping.items() else: items = mapping for name, value_or_values in items: self._add(name, value_or_values) def get(self, name: KT, default: Optional[VT] = None) -> Optional[VT]: """Get the last value for a given key. """ try: return self[name] except KeyError: return default def get_all(self, name: KT) -> List[VT]: """Get all the values for a given key. """ return self._data[name] def __getitem__(self, name: KT) -> VT: """Get the last value for a given key. Raises: KeyError: When the key is missing. """ try: return self._data[name][-1] except IndexError: raise KeyError(name) def __iter__(self) -> Iterator[Tuple[KT, VT]]: """Iterate over all the parameters. """ for name, values in self._data.items(): for value in values: yield name, value def __repr__(self) -> str: mapping = ", ".join(f"{repr(name)}: {repr(value)}" for name, value in self._data.items()) return f"{type(self).__name__}({{{mapping}}})" def annotate(**options: Any) -> Callable[..., Any]: """Add arbitrary attributes to a callable. Examples: >>> @annotate(openapi_tags=["a", "b"]) ... def some_handler(): ... ... >>> some_handler.openapi_tags ["a", "b"] """ def wrapper(fn: Callable[..., Any]) -> Callable[..., Any]: for name, value in options.items(): setattr(fn, name, value) return fn return wrapper PK!5molten/components.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from inspect import Parameter from typing import Any, Dict, List, Optional, TypeVar from .dependency_injection import DependencyResolver from .errors import ( HeaderMissing, HTTPError, ParamMissing, RequestParserNotAvailable, ValidationError ) from .http import HTTP_400, Cookies, Headers, QueryParams from .parsers import RequestParser from .router import Route from .typing import ( Header, QueryParam, RequestBody, RequestData, RequestInput, extract_optional_annotation ) from .validation import is_schema, load_schema _T = TypeVar("_T") class HeaderComponent: """Retrieves a named header from the request. Examples: def handle(content_type: Header) -> Response: ... def handle(content_type: Optional[Header]) -> Response: ... """ is_cacheable = False is_singleton = False def can_handle_parameter(self, parameter: Parameter) -> bool: _, annotation = extract_optional_annotation(parameter.annotation) return annotation is Header def resolve(self, parameter: Parameter, headers: Headers) -> Optional[str]: is_optional, _ = extract_optional_annotation(parameter.annotation) header_name = parameter.name.replace("_", "-") try: return headers[header_name] except HeaderMissing: if is_optional: return None raise HTTPError(HTTP_400, {"errors": {header_name: "missing"}}) class QueryParamComponent: """Retrieves a named query param from the request. Examples: def handle(x: QueryParam) -> Response: ... def handle(x: Optional[QueryParam]) -> Response: ... """ is_cacheable = False is_singleton = False def can_handle_parameter(self, parameter: Parameter) -> bool: _, annotation = extract_optional_annotation(parameter.annotation) return annotation is QueryParam def resolve(self, parameter: Parameter, params: QueryParams) -> Optional[str]: is_optional, _ = extract_optional_annotation(parameter.annotation) try: return params[parameter.name] except ParamMissing: if is_optional: return None raise HTTPError(HTTP_400, {"errors": {parameter.name: "missing"}}) class RequestBodyComponent: """A component that reads the entire request body into a string. Examples: def handle(body: RequestBody) -> Response: ... """ is_cacheable = True is_singleton = False def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is RequestBody def resolve(self, content_length: Header, body_file: RequestInput) -> RequestBody: return RequestBody(body_file.read(int(content_length))) class RequestDataComponent: """A component that parses request data based on the content-type header and the set of registered request parsers. If no request parser is available, then an HTTP 415 response is returned. Examples: def handle(data: RequestData) -> Response: ... """ __slots__ = ["parsers"] is_cacheable = True is_singleton = False def __init__(self, parsers: List[RequestParser]) -> None: self.parsers = parsers def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is RequestData def resolve(self, content_type: Optional[Header], resolver: DependencyResolver) -> RequestData: content_type_str = (content_type or "").lower() for parser in self.parsers: if parser.can_parse_content(content_type_str): return RequestData(resolver.resolve(parser.parse)()) raise RequestParserNotAvailable(content_type_str) class CookiesComponent: """A component that parses request cookies. Examples: def handle(cookies: Cookies) -> Response: cookies["some-cookie"] """ is_cacheable = True is_singleton = False def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is Cookies def resolve(self, cookie: Optional[Header]) -> Cookies: if cookie is None: return Cookies() return Cookies.parse(cookie) class RouteComponent: """A component that resolves the current route. """ __slots__ = ["route"] is_cacheable = True is_singleton = False def __init__(self, route: Optional[Route]) -> None: self.route = route def can_handle_parameter(self, parameter: Parameter) -> bool: _, annotation = extract_optional_annotation(parameter.annotation) return annotation is Route def resolve(self) -> Optional[Route]: return self.route class RouteParamsComponent: """A component that resolves route params. Examples: def handle(name: str, age: int) -> Response: ... app.add_route(Route("/{name}/{age}", handle)) """ __slots__ = ["params"] is_cacheable = False is_singleton = False def __init__(self, params: Dict[str, str]) -> None: self.params = params def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.name in self.params def resolve(self, parameter: Parameter) -> Any: try: return parameter.annotation(self.params[parameter.name]) except (TypeError, ValueError): raise HTTPError(HTTP_400, {"errors": { parameter.name: f"invalid {parameter.annotation.__name__} value", }}) class SchemaComponent: """A component that validates request data according to a schema. """ is_cacheable = False is_singleton = False def can_handle_parameter(self, parameter: Parameter) -> bool: return is_schema(parameter.annotation) def resolve(self, parameter: Parameter, data: RequestData) -> Any: try: return load_schema(parameter.annotation, data) except ValidationError as e: raise HTTPError(HTTP_400, {"errors": e.reasons}) PK!& ##molten/contrib/dramatiq.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import functools import inspect from inspect import Parameter from typing import Any, Callable, Dict, Optional, Sequence, no_type_check from molten import BaseApp try: import dramatiq except ImportError: # pragma: no cover raise ImportError("'dramatiq' package missing. Run 'pip install dramatiq'.") #: The global dependency injector instace. Call setup_dramatiq to set #: this up. _INJECTOR = None def setup_dramatiq(app: BaseApp) -> None: """Sets up the global state required to be able to inject components into Dramatiq actors. Examples: >>> from molten.contrib.dramatiq import setup_dramatiq >>> # All components that were registered with your app will be >>> # available to your actors once you call this function. >>> setup_dramatiq(app) """ global _INJECTOR _INJECTOR = app.injector @no_type_check def actor(fn=None, **kwargs): """Use this in place of dramatiq.actor in order to create actors that can request components via dependency injection. This is just a wrapper around dramatiq.actor and it takes the same set of parameters. Examples: >>> from molten.contrib.dramatiq import actor >>> @actor(queue_name="example") ... def add(x, y, database: Database) -> None: ... database.put(x + y) ... >>> add.send(1, 2) """ def decorator(fn): return dramatiq.actor(_inject(fn), **kwargs) if fn is None: return decorator return decorator(fn) @no_type_check def _inject(fn: Optional[Callable[..., Any]] = None) -> Callable[..., Any]: def decorator(fn): parameters = {name: i for i, name in enumerate(inspect.signature(fn).parameters)} @functools.wraps(fn) def wrapper(*args, **kwargs): try: resolver = _INJECTOR.get_resolver() resolver.add_component(_ArgumentResolver(parameters, args, kwargs)) except AttributeError: # pragma: no cover raise RuntimeError( "Dramatiq support is not set up correctly. " "Don't forget to call setup_dramatiq()." ) resolved_fn = resolver.resolve(fn) return resolved_fn() return wrapper if fn is None: # pragma: no cover return decorator return decorator(fn) class _ArgumentResolver: is_cacheable = False is_singleton = False def __init__(self, parameters: Dict[str, int], args: Sequence[Any], kwargs: Dict[str, Any]) -> None: self.state = state = kwargs for name, idx in parameters.items(): if name not in state: try: state[name] = args[idx] except IndexError: continue def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.name in self.state or \ parameter.default is not Parameter.empty def resolve(self, parameter: Parameter) -> Any: try: return self.state[parameter.name] except KeyError: return parameter.default PK! BIm m molten/contrib/msgpack.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from io import BytesIO from typing import Any from molten import ParseError, RequestInput, Response, dump_schema, is_schema try: from msgpack import Unpacker, packb # type: ignore except ImportError: # pragma: no cover raise ImportError("'msgpack' package missing. Run 'pip install msgpack'.") class MsgpackParser: """A msgpack_ request parser. .. _msgpack: https://msgpack.org/ """ mime_type = "application/x-msgpack" def can_parse_content(self, content_type: str) -> bool: return content_type.startswith("application/x-msgpack") def parse(self, body_file: RequestInput) -> Any: try: return next(Unpacker(body_file, raw=False)) except Exception as e: raise ParseError(f"msgpack input could not be parsed: {e}") class MsgpackRenderer: """A msgpack_ response renderer. .. _msgpack: https://msgpack.org/ """ mime_type = "application/x-msgpack" def can_render_response(self, accept: str) -> bool: return accept.startswith("application/x-msgpack") def render(self, status: str, response_data: Any) -> Response: content = packb(response_data, use_bin_type=True, default=self.default) return Response(status, stream=BytesIO(content), headers={ "content-type": "application/x-msgpack", }) def default(self, ob: Any) -> Any: """You may override this when subclassing the JSON renderer in order to encode non-standard object types. """ if is_schema(type(ob)): return dump_schema(ob) raise TypeError(f"cannot encode values of type {type(ob)}") # pragma: no cover PK!I8 8 molten/contrib/prometheus.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import time from io import BytesIO from typing import Any, Callable from molten import HTTP_200, Request, Response try: from prometheus_client import ( CONTENT_TYPE_LATEST, CollectorRegistry, Counter, Gauge, Histogram, generate_latest, multiprocess ) except ImportError: # pragma: no cover raise ImportError("'prometheus_client' package missing. Run 'pip install prometheus-client'.") REQUEST_DURATION = Histogram( "http_request_duration_seconds", "Time spent processing a request.", ["method", "path"], ) REQUEST_COUNT = Counter( "http_requests_total", "Request count by method, path and status line.", ["method", "path", "status"], ) REQUESTS_INPROGRESS = Gauge( "http_requests_inprogress", "Requests in progress by method and path", ["method", "path"], ) #: Micro-optimization to avoid allocating a new dict on every metrics #: request. Response itself copies the headers its given so this #: shouldn't be a problem. _HEADERS = {"content-type": CONTENT_TYPE_LATEST} def expose_metrics() -> Response: """Expose prometheus metrics from the current process. """ return Response(HTTP_200, headers=_HEADERS, stream=BytesIO(generate_latest())) def expose_metrics_multiprocess() -> Response: # pragma: no cover """Expose prometheus metrics from the current set of processes. Use this instead of expose_metrics if you're using a multi-process server. """ registry = CollectorRegistry() multiprocess.MultiProcessCollector(registry) return Response(HTTP_200, headers=_HEADERS, stream=BytesIO(generate_latest(registry))) def prometheus_middleware(handler: Callable[..., Any]) -> Callable[..., Any]: """Collect prometheus metrics from your handlers. """ def middleware(request: Request) -> Any: status = "500 Internal Server Error" start_time = time.monotonic() requests_inprogress = REQUESTS_INPROGRESS.labels(request.method, request.path) requests_inprogress.inc() try: response = handler() status = response.status return response finally: requests_inprogress.dec() REQUEST_COUNT.labels(request.method, request.path, status).inc() REQUEST_DURATION.labels(request.method, request.path).observe(time.monotonic() - start_time) return middleware PK!nP P molten/contrib/request_id.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import logging from threading import local from typing import Any, Callable, Optional from uuid import uuid4 from molten import Header STATE = local() def get_request_id() -> Optional[str]: """Retrieves the request id for the current thread. """ return getattr(STATE, "request_id", None) def set_request_id(request_id: Optional[str]) -> None: """Set a request id for the current thread. If ``request_id`` is None, then a random id will be generated. """ if request_id is None: request_id = str(uuid4()) STATE.request_id = request_id class RequestIdFilter(logging.Filter): """Adds the current request id to log records, making it possible to log request ids via the standard logging module. Example logging configuration:: import logging.config logging.config.dictConfig({ "version": 1, "filters": { "request_id": { "()": "molten.contrib.request_id.RequestIdFilter" }, }, "formatters": { "standard": { "format": "%(levelname)-8s [%(asctime)s] [%(request_id)s] %(name)s: %(message)s" }, }, "handlers": { "console": { "level": "DEBUG", "class": "logging.StreamHandler", "filters": ["request_id"], "formatter": "standard", }, }, "loggers": { "myapp": { "handlers": ["console"], "level": "DEBUG", "propagate": False, }, } }) """ def filter(self, record: Any) -> bool: record.request_id = get_request_id() return True class RequestIdMiddleware: """Adds an x-request-id to responses containing a unique request id value. If the incoming request has an x-request-id header then that value is reused for the response. This makes it easy to trace requests within a microservice architecture. """ def __call__(self, handler: Callable[..., Any]) -> Callable[..., Any]: def middleware(x_request_id: Optional[Header]) -> Any: set_request_id(x_request_id) response = handler() response.headers.add("x-request-id", get_request_id()) return response return middleware PK!}molten/contrib/sessions.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import base64 import hmac import json from inspect import Parameter from time import time from typing import Any, Callable, Dict, Optional, no_type_check from uuid import uuid4 from typing_extensions import Protocol from molten.dependency_injection import DependencyResolver from molten.http import Cookie, Cookies #: The name of the key the CookieStore inserts into session objects to #: represent the expiration time of the session. COOKIE_EXPIRATION_KEY = "__EXP__" #: The default if the expiration key is not set on the incoming #: session object. DEFAULT_EXPIRATION_TIME = float("inf") class Session(Dict[str, Any]): """Session objects are ordinary dictionaries that are guaranteed to be constructed with an "id" key. """ def __init__(self, id: str, **data: Dict[str, Any]) -> None: super().__init__(id=id, **data) @classmethod def empty(cls) -> "Session": """Create an empty session with a random id. """ return cls(str(uuid4())) class SessionStore(Protocol): """Protocol for session stores. """ @no_type_check def load(self) -> Session: """Load a session from the request. This method may request components via DI. """ @no_type_check def dump(self) -> Cookie: """Convert the session to a Cookie and possibly store it somewhere (eg. memcached, Redis). This method may request components via DI. """ class CookieStore: """A stateless session store based on cookies. Sessions are converted to JSON and then base64-encoded. The values are signed with a signing key and validated when the sessions are subsequently loaded. An expiration time is inserted into the session before it's dumped so as to provide minimal protection against session replay attacks. Warning: Don't store sensitive information in sessions using this store. They are tamper-proof, but users can decode them. """ __slots__ = [ "signing_key", "signing_method", "cookie_ttl", "cookie_name", "cookie_domain", "cookie_path", "cookie_secure", ] def __init__( self, signing_key: bytes, *, signing_method: str = "sha256", cookie_ttl: int = 86400 * 7, cookie_name: str = "__sess__", cookie_domain: Optional[str] = None, cookie_path: Optional[str] = None, cookie_secure: bool = False, ) -> None: self.signing_key = signing_key self.signing_method = signing_method self.cookie_ttl = cookie_ttl self.cookie_name = cookie_name self.cookie_domain = cookie_domain self.cookie_path = cookie_path def load(self, cookies: Cookies) -> Session: cookie = cookies.get(self.cookie_name) if cookie is None: return Session.empty() data, _, signature = cookie.partition(",") if not hmac.compare_digest(signature, self.sign(data.encode())): return Session.empty() # Note: at this point, the data is guaranteed to be valid # given that the data is correctly signed. session_data = json.loads(base64.urlsafe_b64decode(data)) session = Session(**session_data) if session.get(COOKIE_EXPIRATION_KEY, DEFAULT_EXPIRATION_TIME) <= time(): return Session.empty() return session def dump(self, session: Session) -> Cookie: session[COOKIE_EXPIRATION_KEY] = expires = time() + self.cookie_ttl session_data = base64.urlsafe_b64encode(json.dumps(session).encode()) signature = self.sign(session_data) return Cookie( self.cookie_name, f"{session_data.decode()},{signature}", domain=self.cookie_domain, path=self.cookie_path, expires=expires, http_only=True, same_site="strict", ) def sign(self, value: bytes) -> str: return hmac.new(self.signing_key, value, self.signing_method).hexdigest() class SessionComponent: """A component that loads Session objects from the request. Parameters: store: A session store. """ __slots__ = ["store"] is_cacheable = True is_singleton = False def __init__(self, store: SessionStore) -> None: self.store = store def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is Session def resolve(self, resolver: DependencyResolver) -> Session: return resolver.resolve(self.store.load)() class SessionMiddleware: """A middleware that dumps Session data into the response. Parameters: store: A session store. """ __slots__ = ["store"] def __init__(self, store: SessionStore) -> None: self.store = store def __call__(self, handler: Callable[..., Any]) -> Callable[..., Any]: def middleware(resolver: DependencyResolver) -> Callable[..., Any]: response = handler() response.set_cookie(resolver.resolve(self.store.dump)()) return response return middleware PK!hAmolten/contrib/sqlalchemy.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from collections import namedtuple from inspect import Parameter from typing import Any, Callable, NewType, Optional from molten import DependencyResolver, Settings try: from sqlalchemy import create_engine # type: ignore from sqlalchemy.orm import Session, sessionmaker except ImportError: # pragma: no cover raise ImportError("'sqlalchemy' package missing. Run 'pip install sqlalchemy'.") #: The type of session factories. SessionFactory = NewType("SessionFactory", sessionmaker) # type: ignore #: A named tuple containing an instantiated SQLAlchemy ``engine`` #: object and the ``session_factory``. EngineData = namedtuple("EngineData", "engine, session_factory") class SQLAlchemyEngineComponent: """A component that sets up an SQLAlchemy Engine. This component depends on the availability of a :class:`molten.Settings` component. Your settings dictionary must contain a ``database_engine_dsn`` setting pointing at the database to use. Additionally, you may provide a ``database_engine_params`` setting represeting dictionary data that will be passed directly to ``sqlalchemy.create_engine``. Examples: >>> from molten import App >>> from molten.contrib.sqlalchemy import SQLAlchemyEngineComponent, SQLAlchemySessionComponent, SQLAlchemyMiddleware >>> from molten.contrib.toml_settings import TOMLSettingsComponent >>> app = App( ... components=[ ... TOMLSettingsComponent(), ... SQLAlchemyEngineComponent(), ... SQLAlchemySessionComponent(), ... ], ... middleware=[SQLAlchemyMiddleware()], ... ) """ is_cacheable = True is_singleton = True def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is EngineData def resolve(self, settings: Settings) -> EngineData: engine = create_engine( settings.strict_get("database_engine_dsn"), **settings.get("database_engine_params", {}), ) session_factory = sessionmaker() session_factory.configure(bind=engine) return EngineData(engine, session_factory) class SQLAlchemySessionComponent: """A component that creates and injects SQLAlchemy sessions. Examples: >>> def find_todos(session: Session) -> List[Todo]: ... todos = session.query(TodoModel).all() ... ... """ is_cacheable = True is_singleton = False def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is Session def resolve(self, engine_data: EngineData) -> Session: # type: ignore return engine_data.session_factory() class SQLAlchemyMiddleware: """A middleware that automatically commits SQLAlchemy sessions on handler success and automatically rolls back sessions on handler failure. Sessions are only instantiated and operated upon if the handler or any other middleware has requested an SQLAlchemy session object via DI. This means that handlers that don't request a Session object don't automatically connect to the Database. """ def __call__(self, handler: Callable[..., Any]) -> Callable[..., Any]: def middleware(resolver: DependencyResolver) -> Any: session = None try: response = handler() session = get_optional_session(resolver) if session is not None: session.commit() return response except Exception: session = get_optional_session(resolver) if session is not None: session.rollback() raise finally: if session is not None: session.close() return middleware def get_optional_session(resolver: DependencyResolver) -> Optional[Session]: # type: ignore """Get a session object from the resolver iff one was previously requested. Returns None if no function has requested a session so far. """ for component, value in resolver.instances.items(): if type(component) is SQLAlchemySessionComponent: return value return None PK!:umolten/contrib/templates.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from inspect import Parameter from typing import Any, Dict from molten import HTTP_200, Response try: import jinja2 except ImportError: # pragma: no cover raise ImportError("'jinja2' missing. Run 'pip install jinja2'.") class Templates: """Renders jinja2 templates. """ __slots__ = ["environment"] def __init__(self, path: str) -> None: self.environment = jinja2.Environment( loader=jinja2.FileSystemLoader(path), ) def render(self, template_name: str, **context: Dict[str, Any]) -> Response: """Find a template and render it. Parameters: template_name: The name of the template to render. \**context: Bindings passed to the template. """ template = self.environment.get_template(template_name) rendered_template = template.render(**context) return Response(HTTP_200, content=rendered_template, headers={ "content-type": "text/html", }) class TemplatesComponent: """A component that builds a jinja2 template renderer. Parameters: path: The path to a folder containing your templates. """ __slots__ = ["path"] is_cacheable = True is_singleton = True def __init__(self, path: str) -> None: self.path = path def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is Templates def resolve(self) -> Templates: return Templates(self.path) PK!u molten/contrib/toml_settings.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import os from inspect import Parameter from typing import Optional from molten import Settings as Settings try: import toml except ImportError: # pragma: no cover raise ImportError("'toml' package missing. Run 'pip install toml'.") #: Canary value representing missing values. Missing = object() class TOMLSettings(Settings): """A dictionary of settings parsed from a TOML file. """ @classmethod def from_path(cls, path: str, environment: str) -> "Settings": """Load a TOML file into a dictionary. Raises: FileNotFoundError: When the settings file does not exist. Parameters: path: The path to the TOML file containing your settings. environment: The config environment to use. """ all_settings = toml.load(open(path)) common_settings = all_settings.get("common", {}) environment_settings = all_settings.get(environment, {}) return cls({**common_settings, **environment_settings}) class TOMLSettingsComponent: """A component that loads settings from a TOML file. The settings file should have a "common" section and one section for each environment the application is expected to run in. The environment-specific settings are merged on top of the "common" settings on load. The current environment is determined by the ``ENVIRONMENT`` config variable. Example settings file:: [common] conn_pooling = true conn_pool_size = 1 [dev] [prod] con_pool_size = 32 Examples:: from molten import Settings def handle(settings: Settings): settings.get("conn_pooling") Parameters: path: The path to the TOML file containing your settings. environment: The config environment to use. If not provided, this defaults to the value of the "ENVIRONMENT" environment variable. If that's not set either, then this defaults to "dev". """ __slots__ = ["path", "environment"] is_cacheable = True is_singleton = True def __init__(self, path: str = "./settings.toml", environment: Optional[str] = None) -> None: self.path = path self.environment = environment or os.getenv("ENVIRONMENT", "dev") def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is Settings def resolve(self) -> Settings: assert self.environment return TOMLSettings.from_path(self.path, self.environment) PK!Maamolten/contrib/websockets.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # This module is tested using the autobahn testsuite: # https://github.com/crossbario/autobahn-testsuite import io import logging import socket import struct from base64 import b64encode from concurrent.futures import Future, ThreadPoolExecutor from hashlib import sha1 from inspect import Parameter from typing import Any, Callable, Optional, Pattern, Union from molten import ( HTTP_400, HTTP_426, BaseApp, DependencyResolver, Environ, HeaderMissing, HTTPError, MoltenError, Request, RequestHandled, Response, Route, TestClient ) from molten.http.headers import Headers, HeadersDict from molten.http.query_params import ParamsDict, QueryParams try: import gevent except ImportError: # pragma: no cover raise ImportError("'gevent' package missing. Run 'pip install gevent'.") LOGGER = logging.getLogger(__name__) #: The pre-shared key defined in the Websocket spec. PSK = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" #: The amount of bytes to request per recv call. CHUNKSIZE = 16 * 1024 #: The maximum number of bytes text and binary frames can contain. MAX_MESSAGE_SIZE = 16 * 1024 * 1024 #: The maximum number of bytes data frames can contain. MAX_DATA_FRAME_PAYLOAD_SIZE = MAX_MESSAGE_SIZE #: The maximum number of bytes control frames can contain. MAX_CONTROL_FRAME_PAYLOAD_SIZE = 125 #: Continuation frame. Only valid if received after non-final text or binary frames. OP_CONTINUATION = 0x0 #: A frame with a utf-8 encoded payload. May or may not be final. OP_TEXT = 0x1 #: A frame containing binary data. May or may not be final. OP_BINARY = 0x2 #: A frame signaling that the connection should be closed. Always final. OP_CLOSE = 0x8 #: A frame signaling that a PONG frame should be sent to the client. Always final. OP_PING = 0x9 #: A heartbeat frame. Always final. OP_PONG = 0xA #: The set of data frame opcodes. DATA_FRAME_OPCODES = {OP_CONTINUATION, OP_TEXT, OP_BINARY} #: The set of control frame opcodes. CONTROL_FRAME_OPCODES = {OP_CLOSE, OP_PING, OP_PONG} #: The set of all valid opcodes. OPCODES = DATA_FRAME_OPCODES | CONTROL_FRAME_OPCODES #: The set of valid close message status codes. VALID_STATUS_CODES = {1000, 1001, 1002, 1003, 1007, 1008, 1009, 1010, 1011} #: The set of reserved close message status codes. RESERVED_STATUS_CODES = {1004, 1005, 1006, 1015} #: The set of supported versions. SUPPORTED_VERSIONS = {"7", "8", "13"} #: The set of supported versions as a string. SUPPORTED_VERSIONS_STR = ",".join(SUPPORTED_VERSIONS) #: The payload that is returned as part of the connection upgrade process. UPGRADE_RESPONSE_TEMPLATE = b"\r\n".join([ b"HTTP/1.1 101 Switching Protocols", b"connection: upgrade", b"upgrade: websocket", b"server: molten", b"sec-websocket-accept: %(websocket_accept)s", b"\r\n", ]) class WebsocketError(MoltenError): """Base class for errors related to websockets. """ class WebsocketProtocolError(WebsocketError): """Raised whenever the protocol is violated. """ class WebsocketMessageTooLargeError(WebsocketProtocolError): """Raised when an incoming message contains too much data. """ class WebsocketFrameTooLargeError(WebsocketError): """Raised when a frame's payload is too large to be sent in a single frame. """ class WebsocketClosedError(WebsocketError): """Raised when a message is sent to a closed socket. """ class _BufferedStream: """A buffered IO stream backed by a socket. This makes data frame parsing simple and efficient because the data frame reader can request data in small byte chunks, whereas this will read the data in large chunks from the socket under the hood. """ __slots__ = ["buf", "closed", "socket"] def __init__(self, socket: socket.socket) -> None: self.buf = b"" self.closed = False self.socket = socket def read(self, n: int) -> bytes: while not self.closed and len(self.buf) < n: data = self.socket.recv(CHUNKSIZE) if not data: self.closed = True return self.buf self.buf += data data, self.buf = self.buf[:n], self.buf[n:] return data def expect(self, n: int) -> bytes: data = self.read(n) if len(data) != n: raise WebsocketProtocolError("Unexpected EOF while reading from socket.") return data def write(self, data: bytes) -> None: self.socket.sendall(data) def close(self) -> None: self.socket.shutdown(True) self.socket.close() class _DataFrameHeader: __slots__ = ["fin", "flags", "opcode", "length", "mask"] RSV1_MASK = 0x40 RSV2_MASK = 0x20 RSV3_MASK = 0x10 FIN_MASK = MASK_MASK = 0x80 FLAGS_MASK = RSV1_MASK | RSV2_MASK | RSV3_MASK OPCODE_MASK = 0x0F LENGTH_MASK = 0x7F def __init__(self, fin: bool = False, flags: int = 0, opcode: int = 0, length: int = 0, mask: Optional[bytearray] = None) -> None: # noqa self.fin = fin self.flags = flags self.opcode = opcode self.length = length self.mask = mask or bytearray() def mask_data(self, data: bytes) -> bytes: data_array = bytearray(data) for i in range(self.length): data_array[i] ^= self.mask[i % 4] return bytes(data_array) @classmethod def from_stream(cls, stream: _BufferedStream) -> "_DataFrameHeader": """Read a data frame header from the input stream. """ read = stream.expect data = read(2) fb, sb = struct.unpack("!BB", data) header = cls( fb & cls.FIN_MASK == cls.FIN_MASK, fb & cls.FLAGS_MASK, fb & cls.OPCODE_MASK, ) length = sb & cls.LENGTH_MASK if length == 126: header.length = struct.unpack("!H", read(2))[0] elif length == 127: header.length = struct.unpack("!Q", read(8))[0] else: header.length = length if sb & cls.MASK_MASK == cls.MASK_MASK: header.mask = bytearray(read(4)) return header def to_stream(self, stream: _BufferedStream) -> None: """Write this header to the output stream. """ output = bytearray() fb = self.opcode if self.fin: fb |= self.FIN_MASK if self.flags & self.RSV1_MASK == self.RSV1_MASK: fb |= self.RSV1_MASK if self.flags & self.RSV2_MASK == self.RSV2_MASK: fb |= self.RSV2_MASK if self.flags & self.RSV3_MASK == self.RSV3_MASK: fb |= self.RSV3_MASK output.append(fb) sb = self.MASK_MASK if self.mask else 0 if self.length < 126: sb |= self.length output.append(sb) elif self.length <= 0xFFFF: sb |= 126 output.append(sb) output.extend(struct.pack("!H", self.length)) elif self.length <= 0xFFFFFFFFFFFFFFF: sb |= 127 output.append(sb) output.extend(struct.pack("!Q", self.length)) else: raise WebsocketFrameTooLargeError(f"{self.length} bytes cannot fit in a single frame.") if self.mask: output.extend(self.mask) stream.write(output) class _DataFrame: __slots__ = ["header", "data"] def __init__(self, header: _DataFrameHeader, data: bytes = b"") -> None: self.header = header self.data = data @classmethod def from_stream(cls, stream: _BufferedStream) -> "_DataFrame": """Read a data frame from an input stream. """ header = _DataFrameHeader.from_stream(stream) if header.opcode not in OPCODES: raise WebsocketProtocolError(f"Invalid opcode 0x{header.opcode:x}.") if header.flags != 0: raise WebsocketProtocolError("Reserved flags must not be set.") if header.opcode in CONTROL_FRAME_OPCODES: max_size = MAX_CONTROL_FRAME_PAYLOAD_SIZE else: max_size = MAX_DATA_FRAME_PAYLOAD_SIZE if header.length > max_size: raise WebsocketMessageTooLargeError(f"Payload exceeds {max_size} bytes.") data = stream.expect(header.length) if header.mask: data = header.mask_data(data) return cls(header, data) def to_stream(self, stream: _BufferedStream) -> None: """Write this data frame to the output stream. """ self.header.to_stream(stream) if self.header.mask: stream.write(self.header.mask_data(self.data)) else: stream.write(self.data) class Message: """A websocket message, composed of one or more data frames. """ __slots__ = ["buf"] def __init__(self, message: bytes = b"") -> None: self.buf = io.BytesIO(message) @classmethod def from_frame(cls, frame: _DataFrame) -> "Message": message = cls() message.buf.write(frame.data) return message def add_frame(self, frame: _DataFrame) -> None: # pragma: no cover raise NotImplementedError(f"{type(self).__name__} does not implement add_frame()") def to_stream(self, stream: _BufferedStream) -> None: """Write this message to the output stream. """ output = self.get_output() header = _DataFrameHeader(fin=True, opcode=OPCODES_BY_MESSAGE[type(self)], length=len(output)) frame = _DataFrame(header, output) # type: ignore frame.to_stream(stream) def get_data(self) -> bytes: """Get this message's data as a bytestring. """ return self.buf.getvalue() def get_text(self) -> str: """Get this message's contents as text. """ try: return self.buf.getvalue().decode("utf-8") except UnicodeDecodeError as e: raise WebsocketProtocolError("Invalid UTF-8 payload.") from None def get_output(self) -> Union[bytes, bytearray, memoryview]: """Get this message's output payload. CloseMessage hooks into this to prepend the status code to the payload. """ return self.buf.getbuffer() class CloseMessage(Message): """Received (or sent) when the connection should be closed. Close messages sent by the client are automatically handled by receive(). Attributes: code(int): The close status code. """ __slots__ = ["buf", "code"] def __init__(self, code: int = 1000, reason: str = "") -> None: self.buf = io.BytesIO(reason.encode("utf-8")) self.code = code @classmethod def from_frame(cls, frame: _DataFrame) -> "Message": code = 1000 if frame.data: code_data, frame.data = frame.data[:2], frame.data[2:] if len(code_data) < 2: raise WebsocketProtocolError("Expected status code in close message payload.") code = struct.unpack("!H", code_data)[0] if code < 1000 or code > 4999: raise WebsocketProtocolError(f"Invalid status code {code}.") elif code in RESERVED_STATUS_CODES: raise WebsocketProtocolError(f"Status code {code} is reserved.") elif code < 3000 and code not in VALID_STATUS_CODES: raise WebsocketProtocolError(f"Invalid status code {code}.") message = cls() message.code = code message.buf.write(frame.data) return message def get_output(self) -> Union[bytes, bytearray, memoryview]: return struct.pack("!H", self.code) + self.buf.getvalue() class BinaryMessage(Message): """A message containing binary data. """ def add_frame(self, frame: _DataFrame) -> None: if len(self.buf.getbuffer()) + len(frame.data) > MAX_MESSAGE_SIZE: raise WebsocketProtocolError(f"Message exceeds {MAX_MESSAGE_SIZE} bytes.") self.buf.write(frame.data) class TextMessage(BinaryMessage): """A message containing text data. """ def __init__(self, message: str = "") -> None: super().__init__(message.encode("utf-8")) class PingMessage(Message): """A PING message. These are automatically handled by receive(). """ class PongMessage(Message): """A PONG message. These are automatically handled by receive(). """ #: A mapping from message classes to opcodes. OPCODES_BY_MESSAGE = { CloseMessage: OP_CLOSE, BinaryMessage: OP_BINARY, TextMessage: OP_TEXT, PingMessage: OP_PING, PongMessage: OP_PONG, } class Websocket: """Represents a single websocket connection. These are used for bi-directional communication with a websocket client. Websockets are *not* thread-safe. Example: >>> from molten import annotate >>> from molten.contrib.websockets import CloseMessage, Websocket >>> @annotate(supports_ws=True) ... def echo(sock: Websocket): ... while not sock.closed: ... message = sock.receive() ... if isinstance(message, CloseMessage): ... break ... ... sock.send(message) Attributes: closed(bool): Whether or not this socket has been closed. """ __slots__ = ["closed", "stream"] def __init__(self, stream: _BufferedStream) -> None: self.closed = False self.stream = stream def receive(self, *, timeout: Optional[float] = None) -> Optional[Message]: """Waits for a message from the client for up to *timeout* seconds. """ if self.closed: return None with gevent.Timeout(timeout): message = None while True: frame = _DataFrame.from_stream(self.stream) if frame.header.opcode == OP_TEXT: if message is not None: raise WebsocketProtocolError("Unexpected text frame.") message = TextMessage.from_frame(frame) elif frame.header.opcode == OP_BINARY: if message is not None: raise WebsocketProtocolError("Unexpected binary frame.") message = BinaryMessage.from_frame(frame) elif frame.header.opcode == OP_CONTINUATION: if message is None: raise WebsocketProtocolError("Unexpected continuation frame.") message.add_frame(frame) elif frame.header.opcode == OP_CLOSE: if not frame.header.fin: raise WebsocketProtocolError("Close frame is not final.") message = CloseMessage.from_frame(frame) self.close(CloseMessage(reason=message.get_text())) return message elif frame.header.opcode == OP_PING: if not frame.header.fin: raise WebsocketProtocolError("Ping frame is not final.") self.send(PongMessage(frame.data)) continue elif frame.header.opcode == OP_PONG: if not frame.header.fin: raise WebsocketProtocolError("Pong frame is not final.") continue else: raise WebsocketProtocolError(f"Unexpected frame with opcode 0x{frame.header.opcode:x}.") if frame.header.fin: return message def send(self, message: Message) -> None: """Send a message to the client. """ if self.closed: raise WebsocketClosedError("Websocket already closed.") message.to_stream(self.stream) def close(self, message: Optional[Message] = None) -> None: """Close this websocket and send a close message to the client. Note: This does not close the underlying websocket as it's better to let gunicorn handle that by itself. """ try: self.send(message or CloseMessage()) self.stream.close() except WebsocketClosedError: pass finally: self.closed = True class _WebsocketComponent: """Resolves websocket objects. Users of this module don't need to worry about providing this to the App object as the middleware does it automatically. """ __slots__ = ["websocket"] is_cacheable = True is_singleton = False def __init__(self, websocket: Websocket) -> None: self.websocket = websocket def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is Websocket def resolve(self) -> Websocket: return self.websocket class WebsocketsMiddleware: """A middleware that handles websocket upgrades. Warning: Please note that this functionality is currently gunicorn-specific and it requires the use of async workers in order to function correctly. Parameters: origin_re: An optional regular expression that can be used to validate the origin of incoming browser requests. """ __slots__ = ["origin_re"] def __init__(self, origin_re: Optional[Pattern[str]] = None) -> None: self.origin_re = origin_re def handle_exception(self, exception: BaseException, websocket: Websocket) -> None: """Called whenever an unhandled exception occurs in middleware or a handler. Overwrite this in a subclass to implement custom error handling for websocket handlers. If you do overwrite this, don't forget to close the websocket connection when necessary. """ LOGGER.exception("Unhandled error from websocket handler.") if issubclass(type(exception), WebsocketProtocolError): websocket.close(CloseMessage(1002, str(exception))) elif issubclass(type(exception), WebsocketFrameTooLargeError): websocket.close(CloseMessage(1009, str(exception))) else: websocket.close(CloseMessage(1011, "Internal server error.")) def __call__(self, handler: Callable[..., Any]) -> Callable[..., Response]: def handle( resolver: DependencyResolver, request: Request, environ: Environ, route: Optional[Route], ) -> Response: if route is None or not getattr(route.handler, "supports_ws", False): return handler() try: connection = request.headers["connection"] upgrade = request.headers["upgrade"] websocket_key = request.headers["sec-websocket-key"] websocket_version = request.headers["sec-websocket-version"] except HeaderMissing as e: raise HTTPError(HTTP_400, {"errors": {str(e): "this header is required"}}) try: origin = request.headers["origin"] except HeaderMissing: origin = "" if self.origin_re and not self.origin_re.match(origin): raise HTTPError(HTTP_400, {"error": "invalid origin"}) if "upgrade" not in connection.lower() or "websocket" not in upgrade.lower(): raise HTTPError(HTTP_400, {"error": "invalid upgrade request"}) if websocket_version not in SUPPORTED_VERSIONS: return Response(HTTP_426, headers={"sec-websocket-version": SUPPORTED_VERSIONS_STR}) # TODO: Implement extension handling. # TODO: Implement subprotocol handling. stream = _BufferedStream(environ["gunicorn.socket"]) stream.write(UPGRADE_RESPONSE_TEMPLATE % { b"websocket_accept": b64encode(sha1(f"{websocket_key}{PSK}".encode()).digest()), }) websocket = Websocket(stream) resolver.add_component(_WebsocketComponent(websocket)) try: handler() except Exception as e: handle_exception = resolver.resolve(self.handle_exception) handle_exception(exception=e) finally: websocket.close(CloseMessage()) raise RequestHandled("websocket request was upgraded") return handle class _WebsocketsTestConnection: """A proxy context manager for websocket objects. """ __slots__ = ["__future", "__socket"] def __init__(self, future: Future, socket: Websocket) -> None: # type: ignore self.__future = future self.__socket = socket def close(self) -> None: try: self.__socket.send(CloseMessage()) except WebsocketClosedError: pass finally: self.__future.result() def __enter__(self) -> "_WebsocketsTestConnection": return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: self.close() def __getattr__(self, name: str) -> Any: return getattr(self.__socket, name) class WebsocketsTestClient(TestClient): """This is a subclass of the standard test client that adds an additional method called :meth:`.connect` that may be used to connect to websocket endpoints. Example: >>> client = WebsocketsTestClient(app) >>> with client.connect("/echo") as sock: ... sock.send(TextMessage("hi!")) ... assert sock.receive(timeout=1).get_text() == "hi!" Note: In order for :meth:`receive's` "timeout" parameter to work, you need use gevent to monkeypatch sockets before running your tests. """ def __init__(self, app: BaseApp) -> None: self.app = app self.executor = ThreadPoolExecutor(max_workers=8) def connect( self, path: str, headers: Optional[Union[HeadersDict, Headers]] = None, params: Optional[Union[ParamsDict, QueryParams]] = None, auth: Optional[Callable[[Request], Request]] = None, ) -> _WebsocketsTestConnection: """Initiate a websocket connection against the application. Parameters: path: The request path. headers: Optional request headers. params: Optional query params. auth: An optional function that can be used to add auth headers to the request. """ headers = headers or Headers() headers["connection"] = "upgrade" headers["upgrade"] = "websocket" headers["sec-websocket-key"] = b64encode(b"a" * 16).decode() headers["sec-websocket-version"] = "13" client_sock, server_sock = socket.socketpair() def prepare_environ(environ: Environ) -> Environ: nonlocal client_sock environ["gunicorn.socket"] = client_sock return environ # Execute the websocket handler in a background thread because # it may block while waiting on the socket. Keep a reference # to it so we can keep track of exceptions that occur in the # handler. future = self.executor.submit( self.get, path, headers, params, auth=auth, prepare_environ=prepare_environ, ) # Delete the client sock so that, if the future completes w/o # upgrading the request, it'll get freed (closed) and we're # not stuck waiting for a response below. del client_sock # Consume the upgrade response and make sure it looks right. # This is kind of piggy, but it should be fine. response_data = b"" while b"\r\n\r\n" not in response_data and future.running(): data = server_sock.recv(CHUNKSIZE) if not data: break response_data += data expected_response = UPGRADE_RESPONSE_TEMPLATE % { b"websocket_accept": b"3SC6TZx4582OZaOogPVxMx5CGS0=", } if not response_data == expected_response: raise ValueError(f"Invalid upgrade response: {response_data}. Did you connect() to a standard endpoint?") websocket = Websocket(_BufferedStream(server_sock)) return _WebsocketsTestConnection(future, websocket) PK!,  molten/dependency_injection.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import functools from inspect import Parameter, signature from typing import Any, Callable, Dict, Iterable, List, Optional, TypeVar, no_type_check from typing_extensions import Protocol from .errors import DIError _T = TypeVar("_T", covariant=True) class Component(Protocol[_T]): # pragma: no cover """The component protocol. Examples: >>> class DBComponent: ... is_cacheable = True ... is_singleton = True ... ... def can_handle_parameter(self, parameter: Parameter) -> bool: ... return parameter.annotation is DB ... ... def resolve(self, settings: Settings) -> DB: ... return DB(settings["database_dsn"]) """ @property def is_cacheable(self) -> bool: """If True, then the component will be cached within a resolver meaning that instances of the resolved component will be reused within a single request-response cycle. This should be True for most components. Defaults to True. """ @property def is_singleton(self) -> bool: """If True, then the component will be treated as a singleton and cached forever after its first use. Defaults to False. """ def can_handle_parameter(self, parameter: Parameter) -> bool: """Returns True when parameter represents the desired component. """ @no_type_check def resolve(self) -> _T: """Returns an instance of the component. """ class DependencyInjector: """The dependency injector maintains component state and instantiates the resolver. Parameters: components: The list of components that are used to resolve functions' dependencies. """ __slots__ = [ "components", "singletons", ] components: List[Component[Any]] singletons: Dict[Component[Any], Any] def __init__(self, components: List[Component[Any]], singletons: Optional[Dict[Component[Any], Any]] = None) -> None: self.components = components or [] self.singletons = singletons or {} for component in components: if getattr(component, "is_singleton", False) and component not in self.singletons: resolver = self.get_resolver() resolved_component = resolver.resolve(component.resolve) self.singletons[component] = resolved_component() def get_resolver(self, instances: Optional[Dict[Any, Any]] = None) -> "DependencyResolver": """Get the resolver for this Injector. """ return DependencyResolver( self.components, {**self.singletons, **(instances or {})}, ) class DependencyResolver: """The resolver does the work of actually filling in all of a function's dependencies. """ __slots__ = [ "components", "instances", ] def __init__(self, components: List[Component[Any]], instances: Dict[Component[Any], Any]) -> None: self.components = components[:] self.instances = instances def add_component(self, component: Component[Any]) -> None: """Add a component to this resolver without adding it to the base dependency injector. This is useful for runtime-built components like RouteParamsComponent. """ self.components.append(component) def resolve( self, fn: Callable[..., Any], resolving_parameter: Optional[Parameter] = None, ) -> Callable[..., Any]: """Resolve a function's dependencies. """ @functools.wraps(fn) def resolved_fn(**params: Any) -> Any: for parameter in _get_parameters(fn): if parameter.name in params: continue # When Parameter is requested then we assume that the # caller actually wants the parameter that resolved the # current component that's being resolved. See QueryParam # or Header components for an example. if parameter.annotation is Parameter: params[parameter.name] = resolving_parameter continue # When a DependencyResolver is requested, then we assume # the caller wants this instance. if parameter.annotation is DependencyResolver: params[parameter.name] = self continue # If our instances contains an exact match for a type, # then we return that. This is used to inject the current # Request object among other things. try: params[parameter.name] = self.instances[parameter.annotation] continue except KeyError: pass for component in self.components: if component.can_handle_parameter(parameter): try: params[parameter.name] = self.instances[component] except KeyError: factory = self.resolve(component.resolve, resolving_parameter=parameter) params[parameter.name] = instance = factory() if getattr(component, "is_cacheable", True): self.instances[component] = instance break else: raise DIError(f"cannot resolve parameter {parameter} of function {fn}") return fn(**params) return resolved_fn @functools.lru_cache(maxsize=128) def _get_parameters(fn: Callable[..., Any]) -> Iterable[Parameter]: # A significant amount of time is spent getting handlers' params. # Since they never change, it should be safe to just cache 'em. return signature(fn).parameters.values() PK!7ї molten/errors.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import Any, Dict class MoltenError(Exception): """Base class for all Molten exceptions. """ __slots__ = ["status"] def __init__(self, message: str) -> None: self.message = message def __str__(self) -> str: return self.message class DIError(MoltenError): """Raised when a dependency cannot be resolved. """ class HTTPError(MoltenError): """Base class for HTTP errors. Handlers and middleware can raise these to short-circuit execution. """ __slots__ = ["status", "response", "headers"] def __init__(self, status: str, response: Any, headers: Any = None) -> None: self.status = status self.response = response self.headers = headers or {} def __str__(self) -> str: # pragma: no cover return self.status class RouteNotFound(MoltenError): """Raised when trying to reverse route to a route that doesn't exist. """ class RouteParamMissing(MoltenError): """Raised when a param is missing while reversing a route. """ class RequestHandled(MoltenError): """Signals to the WSGI implementation that start_response was already called within a request handler. This is useful when implementing connection upgrades. """ class RequestParserNotAvailable(MoltenError): """Raised when no request parser can handle the incoming request. """ class ParseError(MoltenError): """Raised by parsers when the input data cannot be parsed. """ class FieldTooLarge(ParseError): """Raised by MultiPartParser when a field exceeds the maximum field size limit. """ class FileTooLarge(ParseError): """Raised by MultiPartParser when a file exceeds the maximum file size limit. """ class TooManyFields(ParseError): """Raised by MultiPartParser when the input contains too many fields. """ class HeaderMissing(MoltenError): """Raised by Headers.__getitem__ when a header does not exist. """ class ParamMissing(MoltenError): """Raised by QueryParams.__getitem__ when a param is missing. """ class ValidationError(MoltenError): """Raised by validator.load when the input data is invalid. """ def __init__(self, reasons: Dict[str, Any]) -> None: self.reasons = reasons def __str__(self) -> str: # pragma: no cover return str(self.reasons) class FieldValidationError(MoltenError): """Raised by Field.validate when a given value is invalid. """ PK!5m3molten/helpers.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from enum import Enum, auto from .http import HTTP_301, HTTP_302, HTTP_307, HTTP_308, Response class RedirectType(Enum): TEMPORARY = auto() PERMANENT = auto() #: A mapping from redirect types to HTTP/1.0 status codes. LEGACY_REDIRECT_CODES = { RedirectType.TEMPORARY: HTTP_302, RedirectType.PERMANENT: HTTP_301, } #: A mapping from redirect types to HTTP/1.1 status codes. The #: advantage of these codes is the request method is preserved during #: the redirect. MODERN_REDIRECT_CODES = { RedirectType.TEMPORARY: HTTP_307, RedirectType.PERMANENT: HTTP_308, } def redirect( target_location: str, *, redirect_type: RedirectType = RedirectType.TEMPORARY, use_modern_codes: bool = True, ) -> Response: """Construct an HTTP Response to redirect the client elsewhere. Parameters: target_location: Where the client should be redirected to. redirect_type: PERMANENT or TEMPORARY. use_modern_codes: Whether or not to use HTTP/1.1 response codes. The advantage to using HTTP/1.1 codes is the request method is preserved during redirect, but older clients (IE11 and older) might not support them. """ if use_modern_codes: status_code = MODERN_REDIRECT_CODES[redirect_type] else: status_code = LEGACY_REDIRECT_CODES[redirect_type] return Response(status_code, headers={ "Location": target_location }) PK!,mmolten/http/__init__.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from .cookies import Cookie, Cookies from .headers import Headers from .query_params import QueryParams from .request import Request from .response import Response, StreamingResponse from .status_codes import * from .uploaded_file import UploadedFile __all__ = [ "Request", "Response", "StreamingResponse", "Headers", "QueryParams", "Cookies", "Cookie", "UploadedFile", # 1xx "HTTP_100", "HTTP_101", "HTTP_102", # 2xx "HTTP_200", "HTTP_201", "HTTP_202", "HTTP_203", "HTTP_204", "HTTP_205", "HTTP_206", "HTTP_207", "HTTP_208", # 3xx "HTTP_300", "HTTP_301", "HTTP_302", "HTTP_303", "HTTP_304", "HTTP_305", "HTTP_307", "HTTP_308", # 4xx "HTTP_400", "HTTP_401", "HTTP_402", "HTTP_403", "HTTP_404", "HTTP_405", "HTTP_406", "HTTP_407", "HTTP_408", "HTTP_409", "HTTP_410", "HTTP_411", "HTTP_412", "HTTP_413", "HTTP_414", "HTTP_415", "HTTP_416", "HTTP_417", "HTTP_418", "HTTP_421", "HTTP_422", "HTTP_423", "HTTP_424", "HTTP_426", "HTTP_428", "HTTP_429", "HTTP_431", "HTTP_444", "HTTP_451", "HTTP_499", # 5xx "HTTP_500", "HTTP_501", "HTTP_502", "HTTP_503", "HTTP_504", "HTTP_505", "HTTP_506", "HTTP_507", "HTTP_508", "HTTP_510", "HTTP_511", "HTTP_599", ] PK!ϰzmolten/http/cookies.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from datetime import datetime, timedelta from typing import Dict, Optional, Union from urllib.parse import parse_qsl, urlencode class Cookies(Dict[str, str]): """A dictionary of request cookies. """ @classmethod def parse(cls, cookie_header: str) -> "Cookies": """Turn a cookie header into a Cookies instance. """ cookies = cls() cookie_strings = cookie_header.split(";") for cookie in cookie_strings: for name, value in parse_qsl(cookie.lstrip()): cookies[name] = value return cookies class Cookie: """An individual response cookie. Raises: ValueError: If the value of same_site is not 'strict' or 'lax'. """ __slots__ = [ "name", "value", "max_age", "expires", "domain", "path", "secure", "http_only", "same_site", ] def __init__( self, name: str, value: str, max_age: Optional[Union[int, float, timedelta]] = None, expires: Optional[Union[int, float, datetime]] = None, domain: Optional[str] = None, path: Optional[str] = None, secure: bool = False, http_only: bool = False, same_site: Optional[str] = None, ) -> None: self.name = name self.value = value self.max_age = max_age self.expires = expires self.domain = domain self.path = path self.secure = secure self.http_only = http_only self.same_site = same_site if same_site and same_site != "strict" and same_site != "lax": raise ValueError("same_site must be either 'strict' or 'lax' or None") def encode(self) -> str: """Convert this cookie to a set-cookie header-compatible string. """ output = [urlencode({self.name: self.value})] if self.max_age is not None: if isinstance(self.max_age, timedelta): duration = int(self.max_age.total_seconds()) else: duration = int(self.max_age) output.append(f"Max-Age={duration}") if self.expires is not None: if isinstance(self.expires, (int, float)): expiration_date = datetime.utcfromtimestamp(self.expires) else: expiration_date = self.expires output.append(f"Expires={_format_cookie_date(expiration_date)}") if self.domain is not None: output.append(f"Domain={self.domain}") if self.path is not None: output.append(f"Path={self.path}") if self.secure: output.append("Secure") if self.http_only: output.append("HttpOnly") if self.same_site is not None: output.append(f"SameSite={self.same_site.title()}") return "; ".join(output) _COOKIE_DATE_DAYS = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] _COOKIE_DATE_MONTHS = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] def _format_cookie_date(date: datetime) -> str: """Formats a cookie expiration date according to [1]. [1]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Date """ tm_year, tm_mon, tm_mday, tm_hour, tm_min, tm_sec, tm_wday, *_ = date.utctimetuple() day = _COOKIE_DATE_DAYS[tm_wday] month = _COOKIE_DATE_MONTHS[tm_mon - 1] return f"{day}, {tm_mday:02d}-{month}-{tm_year} {tm_hour:02d}:{tm_min:02d}:{tm_sec:02d} GMT" PK!!molten/http/headers.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from collections import defaultdict from typing import Dict, Iterable, Iterator, List, Optional, Tuple, Union from ..errors import HeaderMissing from ..typing import Environ #: An alias representing a dictionary of headers. HeadersDict = Dict[str, Union[str, List[str]]] #: WSGI keeps these separate from other headers. CONTENT_VARS = {"CONTENT_LENGTH", "CONTENT_TYPE"} class Headers(Iterable[Tuple[str, str]]): """A mapping from case-insensitive header names to lists of values. """ __slots__ = ["_headers"] def __init__(self, mapping: Optional[HeadersDict] = None) -> None: self._headers: Dict[str, List[str]] = defaultdict(list) self.add_all(mapping or {}) @classmethod def from_environ(cls, environ: Environ) -> "Headers": """Construct a Headers instance from a WSGI environ. """ headers = {} for name, value in environ.items(): if name in CONTENT_VARS: headers[name.replace("_", "-")] = value elif name.startswith("HTTP_"): headers[_parse_environ_header(name)] = [value] return cls(headers) def add(self, header: str, value: Union[str, List[str]]) -> None: """Add values for a particular header. """ if isinstance(value, list): self._headers[header.lower()].extend(value) else: self._headers[header.lower()].append(value) def add_all(self, mapping: HeadersDict) -> None: """Add a group of headers. """ for header, value_or_values in mapping.items(): self.add(header, value_or_values) def get(self, header: str, default: Optional[str] = None) -> Optional[str]: """Get the last value for a given header. """ try: return self[header] except HeaderMissing: return default def get_all(self, header: str) -> List[str]: """Get all the values for a given header. """ return self._headers[header.lower()] def get_int(self, header: str, default: Optional[int] = None) -> Optional[int]: """Get the last value for a given header as an integer. """ try: return int(self[header]) except HeaderMissing: return default def __delitem__(self, header: str) -> None: """Delete all the values for a given header. """ del self._headers[header.lower()] def __getitem__(self, header: str) -> str: """Get the last value for a given header. Raises: HeaderMissing: When the header is missing. """ try: return self._headers[header.lower()][-1] except IndexError: raise HeaderMissing(header) def __setitem__(self, header: str, value: str) -> None: """Replace a header's values. """ self._headers[header.lower()] = [value] def __iter__(self) -> Iterator[Tuple[str, str]]: """Iterate over all the headers. """ for header, values in self._headers.items(): for value in values: yield header, value def __repr__(self) -> str: mapping = ", ".join(f"{repr(name)}: {repr(value)}" for name, value in self._headers.items()) return f"Headers({{{mapping}}})" #: The number of characters that are stripped from the beginning of #: every header name in a WSGI environ. HEADER_PREFIX_LEN = len("HTTP_") #: A lookup table from WSGI header strings to header names. HEADER_PARSER_CACHE: Dict[str, str] = {} def _parse_environ_header(header: str) -> str: try: return HEADER_PARSER_CACHE[header] except KeyError: HEADER_PARSER_CACHE[header] = parsed_header = header[HEADER_PREFIX_LEN:].replace("_", "-") return parsed_header PK!;"  molten/http/query_params.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import Dict, List, Optional, Union from urllib.parse import parse_qsl from ..common import MultiDict from ..errors import ParamMissing from ..typing import Environ ParamsDict = Dict[str, Union[str, List[str]]] class QueryParams(MultiDict[str, str]): """A mapping from param names to lists of values. Once constructed, these instances cannot be modified. """ @classmethod def from_environ(cls, environ: Environ) -> "QueryParams": """Construct a QueryParams instance from a WSGI environ. """ return cls.parse(environ.get("QUERY_STRING", "")) @classmethod def parse(cls, query_string: str) -> "QueryParams": """Construct a QueryParams instance from a query string. """ return cls(parse_qsl(query_string)) def get(self, name: str, default: Optional[str] = None) -> Optional[str]: """Get the last value for a given key. """ try: return self[name] except ParamMissing: return default def __getitem__(self, name: str) -> str: """Get the last value for a given key. Raises: ParamMissing: When the key is missing. """ try: return self._data[name][-1] except IndexError: raise ParamMissing(name) PK!  molten/http/request.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from io import BytesIO from typing import BinaryIO, Dict, List, Optional, Union from ..typing import Environ from .headers import Headers from .query_params import ParamsDict, QueryParams class Request: """Represents an individual HTTP request. Attributes: method: The HTTP method. scheme: The URL scheme. host: The hostname. port: The port. path: The path. params: The query parameters. headers: The request headers. body_file: A file-like object representing the request body. """ __slots__ = [ "method", "scheme", "host", "port", "path", "params", "headers", "body_file", ] def __init__( self, *, method: str = "GET", scheme: str = "http", host: str = "127.0.0.1", port: int = 8000, path: str = "/", params: Optional[Union[ParamsDict, QueryParams]] = None, headers: Optional[Union[Dict[str, Union[str, List[str]]], Headers]] = None, body_file: Optional[BinaryIO] = None, ) -> None: self.method = method self.scheme = scheme self.host = host self.port = port self.path = path self.body_file = body_file or BytesIO() if isinstance(headers, dict): self.headers: Headers = Headers(headers) else: self.headers = headers or Headers() if not params: self.params = QueryParams() elif isinstance(params, dict): self.params = QueryParams(params) else: self.params = params @classmethod def from_environ(cls, environ: Environ) -> "Request": """Construct a Request object from a WSGI environ. """ return Request( method=environ["REQUEST_METHOD"], scheme=environ["wsgi.url_scheme"], host=environ.get("HTTP_HOST", ""), port=environ.get("SERVER_PORT", 0), path=environ.get("PATH_INFO", ""), params=QueryParams.from_environ(environ), headers=Headers.from_environ(environ), body_file=environ["wsgi.input"], ) def __repr__(self) -> str: return ( f"Request(method={repr(self.method)}, scheme={repr(self.scheme)}, host={repr(self.host)}, " f"port={repr(self.port)}, path={repr(self.path)}, params={repr(self.params)}, " f"headers={repr(self.headers)}, body_file={repr(self.body_file)})" ) PK!FFmolten/http/response.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import io import os from typing import BinaryIO, Generator, Optional, Union, cast from .cookies import Cookie from .headers import Headers, HeadersDict class Response: """An HTTP response. Parameters: status: The status line of the response. headers: Optional response headers. content: Optional response content as a string. stream: Optional response content as a file-like object. encoding: An optional encoding for the response. """ __slots__ = [ "status", "headers", "stream", ] def __init__( self, status: str, headers: Optional[Union[HeadersDict, Headers]] = None, content: Optional[str] = None, stream: Optional[BinaryIO] = None, encoding: str = "utf-8", ) -> None: self.status = status if isinstance(headers, dict): self.headers = Headers(headers) else: self.headers = headers or Headers() if content is not None: self.stream: BinaryIO = io.BytesIO(content.encode(encoding)) elif stream is not None: self.stream: BinaryIO = stream else: self.stream: BinaryIO = io.BytesIO() def get_content_length(self) -> Optional[int]: """Compute the content length of this response. """ content_length = self.headers.get_int("content-length") if content_length is None: try: stream_stat = os.fstat(self.stream.fileno()) content_length = stream_stat.st_size except OSError: old_position = self.stream.tell() try: self.stream.seek(0, os.SEEK_END) content_length = self.stream.tell() finally: self.stream.seek(old_position, os.SEEK_SET) return content_length def set_cookie(self, cookie: Cookie) -> None: """Add a cookie to this response. """ self.headers.add("set-cookie", cookie.encode()) def __repr__(self) -> str: return f"Response(status={repr(self.status)}, headers={repr(self.headers)})" class StreamingResponse(Response): """A chunked HTTP response, yielding content from a generator. Parameters: status: The status line of the response. content: A response content generator. headers: Optional response headers. encoding: An optional encoding for the response. """ def __init__( self, status: str, content: Generator[bytes, None, None], headers: Optional[Union[HeadersDict, Headers]] = None, encoding: str = "utf-8", ) -> None: super().__init__( status=status, headers=headers, stream=cast(BinaryIO, GenStream(content)), encoding=encoding, ) self.headers.add("transfer-encoding", "chunked") def get_content_length(self) -> Optional[int]: """Compute the content length of this response. Streaming responses can't know their length up front so this always returns None. """ return None class GenStream: """A file-like object backed by a generator. """ __slots__ = ["gen", "buff"] def __init__(self, gen: Generator[bytes, None, None]) -> None: self.gen = gen self.buff = b"" def read(self, n: int) -> bytes: try: self.buff += next(self.gen) data, self.buff = self.buff[:n], self.buff[n:] return data except StopIteration: return b"" PK! molten/http/status_codes.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # 1xx HTTP_100 = "100 Continue" HTTP_101 = "101 Switching Protocols" HTTP_102 = "102 Processing" # 2xx HTTP_200 = "200 OK" HTTP_201 = "201 Created" HTTP_202 = "202 Accepted" HTTP_203 = "203 Non-Authoritative Information" HTTP_204 = "204 No Content" HTTP_205 = "205 Reset Content" HTTP_206 = "206 Partial Content" HTTP_207 = "207 Multi-Status" HTTP_208 = "208 Already Reported" # 3xx HTTP_300 = "300 Multiple Choices" HTTP_301 = "301 Moved Permanently" HTTP_302 = "302 Found" HTTP_303 = "303 See Other" HTTP_304 = "304 Not Modified" HTTP_305 = "305 Use Proxy" HTTP_307 = "307 Temporary Redirect" HTTP_308 = "308 Permanent Redirect" # 4xx HTTP_400 = "400 Bad Request" HTTP_401 = "401 Unauthorized" HTTP_402 = "402 Payment Required" HTTP_403 = "403 Forbidden" HTTP_404 = "404 Not Found" HTTP_405 = "405 Method Not Allowed" HTTP_406 = "406 Not Acceptable" HTTP_407 = "407 Proxy Authentication Required" HTTP_408 = "408 Request Timeout" HTTP_409 = "409 Conflict" HTTP_410 = "410 Gone" HTTP_411 = "411 Length Required" HTTP_412 = "412 Precondition Failed" HTTP_413 = "413 Payload Too Large" HTTP_414 = "414 Request-URI Too Long" HTTP_415 = "415 Unsupported Media Type" HTTP_416 = "416 Requested Range Not Satisfiable" HTTP_417 = "417 Expectation Failed" HTTP_418 = "418 I'm a teapot" HTTP_421 = "421 Misdirected Request" HTTP_422 = "422 Unprocessable Entity" HTTP_423 = "423 Locked" HTTP_424 = "424 Failed Dependency" HTTP_426 = "426 Upgrade Required" HTTP_428 = "428 Precondition Required" HTTP_429 = "429 Too Many Requests" HTTP_431 = "431 Request Header Fields Too Large" HTTP_444 = "444 Connection Closed Without Response" HTTP_451 = "451 Unavailable For Legal Reasons" HTTP_499 = "499 Client Closed Request" HTTP_415 = "415 Unsupported Media Type" # 5xx HTTP_500 = "500 Internal Server Error" HTTP_501 = "501 Not Implmeneted" HTTP_502 = "502 Bad Gateway" HTTP_503 = "503 Service Unavailable" HTTP_504 = "504 Gateway Timeout" HTTP_505 = "505 HTTP Version Not Supported" HTTP_506 = "506 Variant Also Negotiates" HTTP_507 = "507 Insufficient Storage" HTTP_508 = "508 Loop Detected" HTTP_510 = "510 Not Extended" HTTP_511 = "511 Network Authentication Required" HTTP_599 = "599 Network Connect Timeout Error" PK!!!molten/http/uploaded_file.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from shutil import copyfileobj from typing import Any, BinaryIO, Union from .headers import Headers class UploadedFile: """Represents a file that was uploaded as part of an HTTP request. May be backed by an in-memory file-like object or a real temporary file on disk. Attributes: filename: The name the file had in the request. headers: Headers sent with the file. stream: The file-like object containing the data. """ __slots__ = [ "filename", "headers", "stream", ] def __init__(self, filename: str, headers: Headers, stream: BinaryIO) -> None: self.filename = filename self.headers = headers self.stream = stream def save(self, destination: Union[str, BinaryIO]) -> None: """Save the file's contents either to another file object or to a path on disk. """ if isinstance(destination, str): with open(destination, "wb+") as outfile: copyfileobj(self.stream, outfile) else: copyfileobj(self.stream, destination) def __getattr__(self, name: str) -> Any: return getattr(self.stream, name) def __repr__(self) -> str: params = ", ".join(f"{name}={getattr(self, name)!r}" for name in self.__slots__) return f"UploadedFile({params})" PK!1i  molten/middleware.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import Any, Callable from .app import BaseApp from .errors import HeaderMissing, HTTPError from .http import HTTP_200, HTTP_406, Request, Response from .http.headers import HeadersDict class ResponseRendererMiddleware: """A middleware that renders responses. """ def __call__(self, handler: Callable[..., Any]) -> Callable[..., Response]: def handle(app: BaseApp, request: Request) -> Response: try: headers: HeadersDict = {} response = handler() if isinstance(response, Response): return response elif isinstance(response, tuple): if len(response) == 2: status, response = response elif len(response) == 3: status, response, headers = response else: raise RuntimeError("Response tuple must be (status, data) or (status, data, headers).") else: status, response = HTTP_200, response except HTTPError as e: status, response, headers = e.status, e.response, e.headers try: accept_header = request.headers["accept"] except HeaderMissing: accept_header = "*/*" for mime in accept_header.split(","): mime, _, _ = mime.partition(";") for renderer in app.renderers: if mime == "*/*" or renderer.can_render_response(mime): response = renderer.render(status, response) response.headers.add_all(headers) return response return Response(HTTP_406, content="Not Acceptable") return handle PK!Sk\\molten/openapi/__init__.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from .documents import ( APIKeySecurityScheme, Contact, HTTPSecurityScheme, License, Metadata, generate_openapi_document ) from .handlers import OpenAPIHandler, OpenAPIUIHandler __all__ = [ "OpenAPIHandler", "OpenAPIUIHandler", # OpenAPI Objects "Contact", "License", "Metadata", "APIKeySecurityScheme", "HTTPSecurityScheme", "generate_openapi_document", ] PK!㔰BBmolten/openapi/documents.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # Here be Dragons. OpenAPI and especially JSONSchema are not very # good specs, but at least they're popular! import ast import inspect from collections import defaultdict from operator import itemgetter from textwrap import dedent from types import FunctionType, MethodType from typing import ( Any, Callable, Dict, List, Optional, Set, Tuple, Union, get_type_hints, no_type_check ) from typing_inspect import get_args, get_origin, is_generic_type, is_typevar from ..app import BaseApp from ..router import get_route_parameters from ..typing import Header, QueryParam, extract_optional_annotation from ..validation import Field, dump_schema, field, is_schema, schema @schema class Contact: """Contact information for the exposed API. https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.1.md#contact-object """ name: Optional[str] = None url: Optional[str] = None email: Optional[str] = None @schema class License: """License information for the exposed API. https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.1.md#license-object """ name: str url: Optional[str] = None @schema class Metadata: """Metadata about the exposed API. https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.1.md#infoObject """ title: str description: str version: str terms_of_service: Optional[str] = field(response_name="termsOfService", default=None) contact: Optional[Contact] = None license: Optional[License] = None @schema class Schema: """Describes the type and attributes of a value. https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.1.md#schemaObject """ type: Optional[str] = None format: Optional[str] = None # noqa description: Optional[str] = None required: Optional[List[str]] = None properties: Optional[Dict[str, Any]] = None nullable: Optional[bool] = None all_of: Optional[List[Dict[str, str]]] = field(response_name="allOf", default=None) any_of: Optional[List[Dict[str, str]]] = field(response_name="anyOf", default=None) items: Optional[Dict[str, str]] = None choices: Optional[List[str]] = field(response_name="enum", default=None) pattern: Optional[str] = None minimum: Optional[Union[int, float]] = None maximum: Optional[Union[int, float]] = None multiple_of: Optional[Union[int, float]] = field(response_name="multipleOf", default=None) min_length: Optional[int] = field(response_name="minLength", default=None) max_length: Optional[int] = field(response_name="maxLength", default=None) read_only: Optional[bool] = field(response_name="readOnly", default=None) write_only: Optional[bool] = field(response_name="writeOnly", default=None) @schema class Parameter: """Describes a single handler parameter. https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.1.md#parameterObject """ name: str in_: str = field(response_name="in", choices=["query", "header", "path", "cookie"]) description: Optional[str] = None required: Optional[bool] = None deprecated: bool = False schema: Optional[Schema] = None @schema class APIKeySecurityScheme: """Describes an API key-based security scheme. https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.1.md#securitySchemeObject """ name: str in_: str = field(choices=["query", "header", "cookie"]) type: str = field(response_only=True, default="apiKey") @schema class HTTPSecurityScheme: """Describes an HTTP-based security scheme. https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.1.md#securitySchemeObject """ name: str scheme: str = field(choices=["basic", "bearer"]) type: str = field(response_only=True, default="http") #: The union of acceptable security schemes. SecurityScheme = Union[ APIKeySecurityScheme, HTTPSecurityScheme, ] @no_type_check # noqa def generate_openapi_document( app: BaseApp, metadata: Metadata, security_schemes: List[SecurityScheme], default_security_scheme: Optional[str] = None, ) -> Dict[str, Any]: """Generate an OpenAPI v3 document from an application object and some metadata. """ request_mime_types = {parser.mime_type for parser in app.parsers} response_mime_types = {renderer.mime_type for renderer in app.renderers} schemas: Dict[str, Schema] = {} paths: Dict[str, Dict[str, Any]] = defaultdict(dict) for method, routes in app.router._routes_by_method.items(): method = method.lower() for route in routes: handler = route.handler if not isinstance(handler, (FunctionType, MethodType)): handler = handler.__call__ # type: ignore parameters = [] request_schema_name = None annotations = get_type_hints(handler) route_template_parameters = get_route_parameters(route.template) for name, annotation in annotations.items(): if name == "return": continue is_optional, annotation = extract_optional_annotation(annotation) if name in route_template_parameters: parameters.append(Parameter( name, "path", description=_get_annotation(handler, f"param_{name}_description"), required=True, schema=_generate_primitive_schema(annotation), )) elif annotation is QueryParam: parameters.append(Parameter( name, "query", description=_get_annotation(handler, f"param_{name}_description"), required=not is_optional, schema=Schema("string"), )) elif annotation is Header: parameters.append(Parameter( name.replace("_", "-"), "header", description=_get_annotation(handler, f"param_{name}_description"), required=not is_optional, schema=Schema("string"), )) elif is_schema(annotation): request_schema_name = _generate_schema(annotation, schemas) operation = paths[route.template][method] = { "tags": _get_annotation(handler, "tags", []), "operationId": route.name, "description": dedent(handler.__doc__ or "").rstrip(), "parameters": [dump_schema(param, sparse=True) for param in parameters], "deprecated": _get_annotation(handler, "deprecated", False), "responses": {"200": {"description": "A successful response.", "content": {}}}, } if request_schema_name is not None: operation["requestBody"] = {"content": {}} for media_type in request_mime_types: operation["requestBody"]["content"][media_type] = { "schema": _make_schema_ref(request_schema_name), } # Sort the media types for tools like the Swagger UI. operation["requestBody"]["content"] = _sort_dict(operation["requestBody"]["content"]) # When the response annotation is a tuple of the form # [str, ...] then we assume that the responses will # contain custom status codes followed by the response # objects themselves. response_annotation = annotations.get("return") response_annotation_origin = get_origin(response_annotation) if response_annotation is not None and response_annotation_origin in _TUPLE_TYPES: arguments = _get_args(response_annotation) if len(arguments) == 2 and arguments[0] is str and is_schema(arguments[1]): response_annotation = arguments[1] if response_annotation is not None: if is_schema(response_annotation): response_schema_name = _generate_schema(response_annotation, schemas) for media_type in response_mime_types: operation["responses"]["200"]["content"][media_type] = { "schema": _make_schema_ref(response_schema_name), } elif response_annotation_origin in _LIST_TYPES: arguments = _get_args(response_annotation) if is_schema(arguments[0]): response_schema_name = _generate_schema(arguments[0], schemas) for media_type in response_mime_types: operation["responses"]["200"]["content"][media_type] = { "schema": { "type": "array", "items": _make_schema_ref(response_schema_name), } } status_codes = _extract_status_codes(handler) for status_code in status_codes: status_code_ob = operation["responses"][str(status_code)] = {} if status_code < 300 and status_code != 204: status_code_ob.update(**operation["responses"]["200"]) description = _get_annotation(handler, f"response_{status_code}_description") status_code_ob.update(description=description) # If the declared return type is a response-code-tuple and # the status code finder couldn't find a 200 status code, # then it should be safe to drop that code from the # responses object. if get_origin(annotations.get("return")) in _TUPLE_TYPES and 200 not in status_codes: del operation["responses"]["200"] # TODO: Add support for OAuth2 security scheme. # TODO: Make it possible to annotate that a handler # doesn't require a security scheme. if default_security_scheme is not None: operation["security"] = [{default_security_scheme: []}] return { "openapi": "3.0.1", "info": dump_schema(metadata, sparse=True), "paths": _sort_dict(paths), "components": { "schemas": _sort_dict({name: dump_schema(schema, sparse=True) for name, schema in schemas.items()}), "securitySchemes": _sort_dict({scheme.name: dump_schema(scheme, sparse=True) for scheme in security_schemes}), }, } @no_type_check def _generate_schema(schema: Any, schemas: Dict[str, Schema]) -> str: name = f"{schema.__module__}.{schema.__name__}" if name in schemas: return name definition = Schema( "object", description=schema.__doc__, properties={}, required=[], ) for field in schema._FIELDS.values(): # noqa if field.request_name == field.response_name: field_names = [field.request_name] else: field_names = [field.request_name, field.response_name] for field_name in field_names: is_optional, field_schema = _generate_field_schema(field_name, field, schemas) if field_schema is not None: definition.properties[field_name] = field_schema if not is_optional: definition.required.append(field_name) schemas[name] = definition return name @no_type_check def _generate_field_schema(field_name: str, field: Field, schemas: Dict[str, Schema]) -> Tuple[bool, Schema]: is_optional, annotation = extract_optional_annotation(field.annotation) if is_schema(annotation): field_schema_name = _generate_schema(annotation, schemas) field_schema = Schema(all_of=[_make_schema_ref(field_schema_name)]) elif is_generic_type(annotation): origin = get_origin(annotation) if origin in _LIST_TYPES: arguments = _get_args(annotation) if arguments and is_schema(arguments[0]): item_schema_name = _generate_schema(arguments[0], schemas) field_schema = Schema("array", items=_make_schema_ref(item_schema_name)) else: field_schema = _generate_primitive_schema(annotation) elif origin in _DICT_TYPES: # TODO: Add support for additionalFields. field_schema = _generate_primitive_schema(dict) else: # pragma: no cover raise ValueError(f"Unsupported type {origin} for field {field.name!r}.") else: field_schema = _generate_primitive_schema(annotation) if field_schema is not None: field_schema.description = field.description if field.request_name != field.response_name: if field_name == field.request_name: field_schema.write_only = True else: field_schema.read_only = True elif field.response_only: field_schema.read_only = True elif field.request_only: field_schema.write_only = True for option, value in field.validator_options.items(): if option in Schema._FIELDS: setattr(field_schema, option, value) return is_optional, field_schema @no_type_check def _generate_primitive_schema(annotation: Any) -> Optional[Schema]: try: arguments = _PRIMITIVE_ANNOTATION_MAP[annotation] return Schema(*arguments) except KeyError: origin = get_origin(annotation) if origin in _LIST_TYPES: arguments = _get_args(annotation) if not arguments or is_typevar(arguments[0]): return Schema("array", items=_ANY_VALUE) else: return Schema("array", items=_generate_primitive_schema(arguments[0])) # TODO: Add support for additionalFields. return Schema("string") def _extract_status_codes(handler: Callable[..., Any]) -> List[int]: try: source = inspect.getsource(handler) finder = _StatusCodeFinder() return finder.find(ast.parse(dedent(source))) except OSError: # pragma: no cover return [] def _make_schema_ref(name: str) -> Dict[str, str]: return {"$ref": f"#/components/schemas/{name}"} def _get_annotation(handler: Callable[..., Any], name: str, default: Any = None) -> Any: return getattr(handler, f"openapi_{name}", default) def _get_args(annotation: Any) -> Any: # This is a safe version of get_args that works the same on Python # 3.6 and 3.7 by ensuring that expanded type arguments are merged # into their original type. arguments = list(get_args(annotation)) for i, argument in enumerate(arguments[:]): if isinstance(argument, tuple): arguments[i] = argument[0][argument[1:]] return arguments def _sort_dict(data: Dict[Any, Any]) -> Dict[Any, Any]: # This relies on the ordered dict implementation in Py3.6+. return dict(sorted(data.items(), key=itemgetter(0))) class _StatusCodeFinder(ast.NodeVisitor): """Finds usages of HTTP_* in an AST. """ def __init__(self) -> None: self.status_codes: Set[int] = set() def find(self, tree: Any) -> List[int]: self.visit(tree) return sorted(list(self.status_codes)) def visit_Name(self, node: Any) -> None: if node.id.startswith("HTTP_"): try: self.status_codes.add(int(node.id[len("HTTP_"):])) except ValueError: # pragma: no cover pass #: Maps primitive types to Schema parameters. _PRIMITIVE_ANNOTATION_MAP = { int: ["integer", "int64"], str: ["string"], bool: ["boolean"], dict: ["object"], float: ["number", "double"], bytes: ["string", "binary"], } #: A schema that accepts any value whatsoever. Used in generic types #: w/o a type annotation (eg. List). _ANY_VALUE = { "description": "Can be any value, including null.", "nullable": True, } _DICT_TYPES = {dict, Dict} _LIST_TYPES = {list, List} _TUPLE_TYPES = {tuple, Tuple} PK!>i) molten/openapi/handlers.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import Any, Dict, List, Optional import pkg_resources from ..app import BaseApp from ..http import HTTP_200, Request, Response from .documents import Metadata, SecurityScheme, generate_openapi_document class OpenAPIHandler: """Dynamically generates and serves OpenAPI v3 documents based on the current application object. Once generated, the document is subsequently served from cache. Examples: >>> get_schema = OpenAPIHandler( ... metadata=Metadata(title="Pet Store", version="0.1.0"), ... security_schemes=[HTTPSecurityScheme("Bearer", "bearer")], ... default_security_scheme="Bearer", ... ) Parameters: metadata: Various meta-information about the current API. security_schemes: A list of security schemes used throughout the API. """ def __init__( self, metadata: Metadata, security_schemes: Optional[List[SecurityScheme]] = None, default_security_scheme: Optional[str] = None, ) -> None: self.metadata = metadata self.security_schemes = security_schemes or [] self.default_security_scheme = default_security_scheme self.document: Optional[Dict[str, Any]] = None @property def __name__(self) -> str: return type(self).__name__ # type: ignore def __call__(self, app: BaseApp) -> Optional[Dict[str, Any]]: """Generates an OpenAPI v3 document. """ if not self.document: self.document = generate_openapi_document( app, self.metadata, self.security_schemes, self.default_security_scheme, ) return self.document class OpenAPIUIHandler: """Renders the `Swagger UI`_. Parameters: schema_route_name: The name of the route that exposes the schema. The actual path to the schema is looked up whenever the handler is called. .. _Swagger UI: https://github.com/swagger-api/swagger-ui """ def __init__(self, schema_route_name: str = "OpenAPIHandler") -> None: self.schema_route_name = schema_route_name self.template = pkg_resources.resource_string(f"molten.openapi.templates", "index.html").decode("utf-8") @property def __name__(self) -> str: return type(self).__name__ # type: ignore def __call__(self, app: BaseApp, request: Request) -> Response: """Renders the Swagger UI. """ schema_uri = app.reverse_uri(self.schema_route_name) rendered_template = self.template % {"schema_uri": schema_uri} return Response(HTTP_200, content=rendered_template, headers={ "content-type": "text/html", }) PK!$molten/openapi/templates/__init__.pyPK!lfhh#molten/openapi/templates/index.html molten :: docs
PK!ÌJW*W*molten/parsers.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import json import os import re from tempfile import SpooledTemporaryFile from typing import Any, Dict, Iterator, List, Tuple, Union, no_type_check from urllib.parse import parse_qsl from typing_extensions import Protocol from .common import MultiDict from .errors import FieldTooLarge, FileTooLarge, ParseError, TooManyFields from .http import Headers, UploadedFile from .typing import Header, RequestBody, RequestInput class RequestParser(Protocol): # pragma: no cover """Protocol for request parsers. """ @property def mime_type(self) -> str: """Returns a string representing the mime type of the rendered content. This is used to generate OpenAPI documents. """ def can_parse_content(self, content_type: str) -> bool: """Returns True if this parser can parse the given content type. """ @no_type_check def parse(self) -> Any: """Attempt to parse the input data. Raises: ParseError: if the data cannot be parsed. """ class JSONParser: """A JSON request parser. """ mime_type = "application/json" def can_parse_content(self, content_type: str) -> bool: return content_type.startswith("application/json") def parse(self, data: RequestBody) -> Any: try: return json.loads(data) except json.JSONDecodeError: raise ParseError("JSON input could not be parsed") class URLEncodingParser: """A parser for urlencoded requests. """ mime_type = "application/x-www-form-urlencoded" def can_parse_content(self, content_type: str) -> bool: return content_type.startswith("application/x-www-form-urlencoded") def parse(self, data: RequestBody) -> MultiDict[str, str]: try: return MultiDict(parse_qsl(data.decode("utf-8"), strict_parsing=True)) except ValueError: raise ParseError("failed to parse urlencoded data") class MultiPartParser: """A parser for multipart requests. Returns a MultiDict mapping field names to lists of field string values or UploadedFiles. This is a reasonably simple streaming parser implementation for the multipart/form-data media type. As such, it does not support deprecated parts of RFC7578 like multipart/mixed content and content-transfer-encoding headers. Parameters: bufsize: The max size of the streaming data buffer. This should be a 32 bit integer that's a multiple of 4. In some cases, the streaming data buffer may contain double this amount so take that into account when choosing a value. Additionally, the value should be greater than the longest individual header value you want to accept. encoding: The codec to use when decoding form field values. encoding_errors: What to do when an decoding error is encountered. max_field_size: The max number of bytes a field can contain. max_file_size: The max number of bytes a file can contain. max_num_fields: The max number of fields accepted per request. max_spooled_size: The max number of bytes a file in the request can have before it's written to a temporary file on disk. """ __slots__ = [ "bufsize", "encoding", "encoding_errors", "max_field_size", "max_file_size", "max_num_fields", "max_spooled_size", ] BOUNDARY_RE = re.compile("boundary=(.+)") PARAMS_RE = re.compile('([A-Za-z]+)="([^"]+)"') mime_type = "multipart/form-data" def __init__( self, *, bufsize: int = 64 * 1024, encoding: str = "utf-8", encoding_errors: str = "replace", max_field_size: int = 500 * 1024, max_file_size: int = 10 * 1024 * 1024, max_num_fields: int = 100, max_spooled_size: int = 1024 * 1024, ) -> None: self.bufsize = bufsize self.encoding = encoding self.encoding_errors = encoding_errors self.max_field_size = max_field_size self.max_file_size = max_file_size self.max_num_fields = max_num_fields self.max_spooled_size = max_spooled_size def can_parse_content(self, content_type: str) -> bool: return content_type.startswith("multipart/form-data") def parse(self, content_type: Header, content_length: Header, body_file: RequestInput) -> MultiDict[str, Union[str, UploadedFile]]: # noqa matches = self.BOUNDARY_RE.search(content_type) if not matches: raise ParseError("boundary missing from content-type header") boundary = matches.group(1) lines = self._iter_lines(body_file, boundary, int(content_length)) parts = self._iter_parts(lines, boundary) return MultiDict(parts) def _iter_lines(self, stream: RequestInput, boundary: str, limit: int) -> Iterator[bytes]: buff = b"" remaining = limit while remaining > 0: data = stream.read(self.bufsize) remaining -= len(data) if not data: return buff += data if remaining > 0 and len(buff) < self.bufsize: continue while buff: try: i = buff.index(b"\r\n") except ValueError: break line, buff = buff[:i + 2], buff[i + 2:] yield line if len(buff) >= self.bufsize and not buff.endswith(b"\r"): yield buff buff = b"" def _iter_parts(self, lines: Iterator[bytes], boundary: str) -> Iterator[Tuple[str, Union[str, UploadedFile]]]: next_part = f"--{boundary}\r\n".encode() last_part = f"--{boundary}--\r\n".encode() def prepare_current_part() -> Tuple[str, Union[str, UploadedFile]]: nonlocal total_field_count headers = Headers(current_part_headers) name = current_part_disposition["name"] value: Union[str, UploadedFile] if "filename" in current_part_disposition: # Strip CRLF from the end of the file and then rewind. current_part_container.seek(-2, os.SEEK_END) current_part_container.truncate() headers.add("content-length", str(current_part_container.tell())) current_part_container.seek(0) filename = current_part_disposition["filename"] value = UploadedFile(filename, headers, current_part_container) else: # Strip CRLF from the end of the buffer. data = current_part_container[:-2] value = data.decode(self.encoding, errors=self.encoding_errors) total_field_count += 1 return name, value def append_bytes(data: bytes) -> None: nonlocal current_part_container current_part_container += data total_field_count = 1 current_part_bytes: int = 0 current_part_is_file: bool = False current_part_container: Any = None current_part_writer: Any = None current_part_headers: Dict[str, Union[str, List[str]]] = {} current_part_disposition: Dict[str, str] = {} current_part_past_headers: bool = False for line in lines: if total_field_count > self.max_num_fields: raise TooManyFields("the input contains too many fields") if line == last_part: if current_part_container is not None: yield prepare_current_part() break elif line == next_part: if current_part_container is not None: yield prepare_current_part() current_part_bytes = 0 current_part_is_file = False current_part_container = None current_part_writer = None current_part_headers = {} current_part_disposition = {} current_part_past_headers = False elif not current_part_past_headers: line = line.rstrip() if not line: if current_part_container is None: raise ParseError("content-disposition header is missing") current_part_past_headers = True continue header_name, _, header_value = line.decode().partition(": ") current_part_headers[header_name] = header_value if header_name.lower() == "content-disposition": current_part_disposition = dict(self.PARAMS_RE.findall(header_value)) if "name" not in current_part_disposition: raise ParseError("content-disposition header without a name") if "filename" in current_part_disposition: current_part_is_file = True current_part_container = SpooledTemporaryFile(mode="wb+", max_size=self.max_spooled_size) current_part_writer = current_part_container.write else: current_part_is_file = False current_part_container = b"" current_part_writer = append_bytes else: current_part_bytes += len(line) if current_part_is_file and current_part_bytes >= self.max_file_size: message = f"file '{current_part_disposition['name']}' exceeds the file size limit" raise FileTooLarge(message) elif not current_part_is_file and current_part_bytes >= self.max_field_size: message = f"field '{current_part_disposition['name']}' exceeds the field size limit" raise FieldTooLarge(message) current_part_writer(line) else: raise ParseError("unexpected end of input") PK!qU4molten/py.typed# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # This package uses inline types as per PEP561.PK! molten/renderers.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import json from typing import Any from typing_extensions import Protocol from .http import Response from .validation import dump_schema, is_schema class ResponseRenderer(Protocol): # pragma: no cover """Protocol for response renderers. """ @property def mime_type(self) -> str: """Returns a string representing the mime type of the rendered content. This is used to generate OpenAPI documents. """ def can_render_response(self, accept: str) -> bool: """Returns True if this renderer can render data for the given mime type. """ def render(self, status: str, response_data: Any) -> Response: """Attempt to render the response data. """ class JSONRenderer: """A JSON response renderer. """ mime_type = "application/json" def can_render_response(self, accept: str) -> bool: return accept.startswith("application/json") def render(self, status: str, response_data: Any) -> Response: content = json.dumps(response_data, default=self.default) return Response(status, content=content, headers={ "content-type": "application/json; charset=utf-8", }) def default(self, ob: Any) -> Any: """You may override this when subclassing the JSON renderer in order to encode non-standard object types. """ if is_schema(type(ob)): return dump_schema(ob) raise TypeError(f"cannot encode values of type {type(ob)}") # pragma: no cover PK!o?molten/router.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import re from collections import defaultdict from typing import Any, Callable, Dict, Iterator, List, Optional, Pattern, Set, Tuple, Union from .errors import RouteNotFound, RouteParamMissing #: Alias for things that can be added to a router. RouteLike = Union["Route", "Include"] class Route: """An individual route. Examples: >>> Route("/accounts", list_accounts) >>> Route("/accounts", create_account, method="POST") >>> Route("/accounts/{account_id}", get_account) Parameters: template: A route template. handler: The request handler for this route. method: The request method. name: An optional name for the route. Used in calls to reverse_uri. Defaults to the name of the handler. """ __slots__ = [ "template", "handler", "method", "name", ] def __init__(self, template: str, handler: Callable[..., Any], method: str = "GET", name: Optional[str] = None) -> None: self.template = template self.handler = handler self.method = method self.name = name or handler.__name__ class Include: """Groups of routes prefixed by a common path. Examples: >>> Include("/v1/accounts", [ ... Route("/", create_account, method="POST"), ... Route("/", list_accounts), ... Route("/{account_id}", get_account), ... ], namespace="accounts") Parameters: prefix: The path that each route will be prefixed with. routes: The list of routes to include. namespace: An optional prefix that will be prepended to each route's name. This is useful to avoid conflicts if your handlers have similar names. """ __slots__ = [ "prefix", "routes", "namespace", ] def __init__(self, prefix: str, routes: List[RouteLike], *, namespace: Optional[str] = None) -> None: self.prefix = prefix self.routes = routes self.namespace = namespace class Router: """A collection of routes. """ __slots__ = [ "_routes_by_name", "_routes_by_method", "_route_res_by_method", ] def __init__(self, routes: Optional[List[RouteLike]] = None) -> None: self._routes_by_name: Dict[str, Route] = {} self._routes_by_method: Dict[str, List[Route]] = defaultdict(list) self._route_res_by_method: Dict[str, List[Pattern[str]]] = defaultdict(list) self.add_routes(routes or []) def add_route(self, route_like: RouteLike, prefix: str = "", namespace: Optional[str] = None) -> None: """Add a Route to this instance. """ if isinstance(route_like, Include): self.add_routes( route_like.routes, prefix=prefix + route_like.prefix, namespace=f"{namespace}:{route_like.namespace}" if namespace else route_like.namespace, ) elif isinstance(route_like, Route): if route_like.name in self._routes_by_name: raise ValueError(f"a route named {route_like.name} is already registered") route_name = route_like.name if namespace: route_name = f"{namespace}:{route_name}" route = Route( template=prefix + route_like.template, handler=route_like.handler, method=route_like.method, name=route_name, ) self._routes_by_name[route_name] = route self._routes_by_method[route.method].insert(0, route) self._route_res_by_method[route.method].insert(0, compile_route_template(route.template)) else: # pragma: no cover raise NotImplementedError(f"unhandled type {type(route_like)}") def add_routes(self, route_likes: List[RouteLike], prefix: str = "", namespace: Optional[str] = None) -> None: """Add a set of routes to this instance. """ for route_like in route_likes: self.add_route(route_like, prefix, namespace) def match(self, method: str, path: str) -> Union[None, Tuple[Route, Dict[str, str]]]: """Look up the route matching the given method and path. Returns the route and any path params that were extracted from the path. """ routes = self._routes_by_method[method] route_res = self._route_res_by_method[method] for route, route_re in zip(routes, route_res): match = route_re.match(path) if match is not None: return route, match.groupdict() return None def reverse_uri(self, route_name: str, **params: str) -> str: """Build a URI from a Route. Raises: RouteNotFound: When the route doesn't exist. RouteParamMissing: When a required parameter was not provided. Parameters: route_name: The name of the route to reverse. \**params: Route params used to build up the path. """ try: route = self._routes_by_name[route_name] except KeyError: raise RouteNotFound(route_name) uri = [] for kind, token in tokenize_route_template(route.template): if kind == "binding" or kind == "glob": try: uri.append(str(params[token])) except KeyError: raise RouteParamMissing(token) elif kind == "chunk": uri.append(token) return "".join(uri) def compile_route_template(template: str) -> Pattern[str]: """Convert a route template into a regular expression. """ re_template = "" for kind, token in tokenize_route_template(template): if kind == "binding": re_template += f"(?P<{token}>[^/]+)" elif kind == "glob": re_template += f"(?P<{token}>.+)" elif kind == "chunk": re_template += token.replace(".", r"\.") else: # pragma: no cover raise NotImplementedError(f"unhandled token kind {kind!r}") return re.compile(f"^{re_template}$") def tokenize_route_template(template: str) -> Iterator[Tuple[str, str]]: """Convert a route template into a stream of tokens. """ k, i = 0, 0 while i < len(template): if template[i] == "{": yield "chunk", template[k:i] k = i kind = "binding" if template[i:i + 2] == "{*": kind = "glob" i += 1 for j in range(i + 1, len(template)): if template[j] == "}": yield kind, template[i + 1:j] k = j + 1 i = j break else: raise SyntaxError(f"unmatched {{ in route template {template!r}") i += 1 if k != i: yield "chunk", template[k:i] def get_route_parameters(template: str) -> Set[str]: """Extract all the named route parameters from a route template. """ return {token for kind, token in tokenize_route_template(template) if kind == "binding"} PK!@ @ molten/settings.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from inspect import Parameter from typing import Any, Dict, Optional #: Canary value representing missing values. Missing = object() class Settings(Dict[str, Any]): """A dictionary of settings. """ def deep_get(self, path: str, default: Optional[Any] = None) -> Optional[Any]: """Look up a deeply-nested setting by its path. Examples: >>> settings = Settings({"a": {"b": [{"c": 42}]}}) >>> settings.deep_get("a.b.0.c") 42 Raises: TypeError: When attempting to index into a primitive value or when indexing a list with a string value rather than an integer. Parameters: path: A dot-separated string representing the path to the value. default: The value to return if the path cannot be traversed. """ root = self names = path.split(".") for name in names: if isinstance(root, list): try: root = root[int(name)] except (IndexError, ValueError): raise TypeError(f"invalid index '{name}' for list {root!r}") elif isinstance(root, dict): root = root.get(name, Missing) else: raise TypeError(f"value {root!r} at subpath '{name}' is not a list or a dict") if root is Missing: return default return root def strict_get(self, path: str) -> Any: """Get a required setting. Raises: RuntimeError: If the value for that setting cannot be found. """ value = self.deep_get(path, Missing) if value is Missing: raise RuntimeError(f"Cannot find required setting at path {path!r}.") return value class SettingsComponent: """A component for settings that you build at app init time. Examples: >>> settings = Settings({"database_engine_dsn": "sqlite://"}) >>> app = App(components=[SettingsComponent(settings)]) """ __slots__ = ["settings"] is_cacheable = True is_singleton = True def __init__(self, settings: Settings) -> None: self.settings = settings def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is Settings def resolve(self) -> Settings: return self.settings PK!YUeemolten/testing/__init__.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from .client import TestClient, TestResponse from .common import to_environ __all__ = ["TestClient", "TestResponse", "to_environ"] PK!}Xmolten/testing/client.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import json from functools import partial from io import BytesIO from json import dumps as to_json from typing import Any, BinaryIO, Callable, Dict, Optional, Union, cast from urllib.parse import urlencode from ..app import BaseApp from ..http import HTTP_200 from ..http.headers import Headers, HeadersDict from ..http.query_params import ParamsDict, QueryParams from ..http.request import Request from ..http.response import Response from ..typing import Environ from .common import to_environ try: from gunicorn.http.errors import NoMoreData # type: ignore except ImportError: # pragma: no cover class NoMoreData(Exception): # type: ignore pass HTTP_METHODS = {"delete", "head", "get", "options", "patch", "post", "put"} class TestResponse: """A wrapper around Response objects that adds a few additional helper methods for testing. Attributes: status: The response status line. status_code: The response status code as an integer. headers: The response headers. stream: The response data as a binary file. data: The response data as a string. """ __slots__ = ["_response"] def __init__(self, response: Response) -> None: self._response = response @property def data(self) -> str: """Rewinds the output stream and returns all its data. """ self._response.stream.seek(0) return self._response.stream.read().decode("utf-8") @property def status_code(self) -> int: """Returns the HTTP status code as an integer. """ code, _, _ = self._response.status.partition(" ") return int(code) def json(self) -> Any: """Convert the response data to JSON. """ return json.loads(self.data) def __getattr__(self, name: str) -> Any: return getattr(self._response, name) class TestClient: """Test clients are used to simulate requests against an application instance. """ __slots__ = ["app"] def __init__(self, app: BaseApp) -> None: self.app = app def request( self, method: str, path: str, headers: Optional[Union[HeadersDict, Headers]] = None, params: Optional[Union[ParamsDict, QueryParams]] = None, body: Optional[bytes] = None, data: Optional[Dict[str, str]] = None, json: Optional[Any] = None, auth: Optional[Callable[[Request], Request]] = None, prepare_environ: Optional[Callable[[Environ], Environ]] = None, ) -> TestResponse: """Simulate a request against the application. Raises: RuntimeError: If both 'data' and 'json' are provided. Parameters: method: The request method. path: The request path. headers: Optional request headers. params: Optional query params. body: An optional bytestring for the request body. data: An optional dictionary for the request body that gets url-encoded. json: An optional value for the request body that gets json-encoded. auth: An optional function that can be used to add auth headers to the request. """ if data is not None and json is not None: raise RuntimeError("either 'data' or 'json' should be provided, not both") request = Request( method=method.upper(), path=path, headers=headers, params=params, ) if body is not None: request.headers["content-length"] = f"{len(body)}" request.body_file = BytesIO(body) if data is not None: request_content = urlencode(data).encode("utf-8") request.headers["content-type"] = "application/x-www-form-urlencoded" request.headers["content-length"] = f"{len(request_content)}" request.body_file = BytesIO(request_content) elif json is not None: request_content = to_json(json).encode("utf-8") request.headers["content-type"] = "application/json; charset=utf-8" request.headers["content-length"] = f"{len(request_content)}" request.body_file = BytesIO(request_content) if auth is not None: request = auth(request) response = Response(HTTP_200) def start_response(status, response_headers, exc_info=None): # type: ignore nonlocal response response.status = status response.headers = Headers(dict(response_headers)) try: environ = to_environ(request) if prepare_environ: # pragma: no cover environ = prepare_environ(environ) chunks = self.app(environ, start_response) except NoMoreData: chunks = [] if response.headers.get("transfer-encoding") == "chunked": response.stream = cast(BinaryIO, chunks) return TestResponse(response) for chunk in chunks: response.stream.write(chunk) response.stream.seek(0) return TestResponse(response) def __getattr__(self, name: str) -> Any: if name in HTTP_METHODS: return partial(self.request, name) raise AttributeError(f"unknown attribute {name}") PK!جj;molten/testing/common.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import Union from urllib.parse import urlencode from ..http import Headers, QueryParams, Request from ..typing import Environ def to_environ(ob: Union[Request, Headers, QueryParams]) -> Environ: """Convert request abstractions to WSGI environ dicts. """ if isinstance(ob, Request): return { "HTTP_HOST": ob.host, "PATH_INFO": ob.path, "REQUEST_METHOD": ob.method, "SERVER_PORT": ob.port, "wsgi.input": ob.body_file, "wsgi.url_scheme": ob.scheme, **to_environ(ob.params), **to_environ(ob.headers), } elif isinstance(ob, Headers): headers = {f"HTTP_{name.upper().replace('-', '_')}": value for name, value in ob} try: headers["CONTENT_TYPE"] = headers.pop("HTTP_CONTENT_TYPE") except KeyError: pass try: headers["CONTENT_LENGTH"] = headers.pop("HTTP_CONTENT_LENGTH") except KeyError: pass return headers elif isinstance(ob, QueryParams): return {"QUERY_STRING": urlencode(list(ob))} else: # pragma: no cover raise NotImplementedError(f"to_environ cannot handle type {type(ob)}") PK! molten/typing.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import Any, BinaryIO, Callable, Dict, List, NewType, Tuple, Union #: An alias representing a WSGI environment dictionary. Environ = Dict[str, Any] #: An alias representing a WSGI start_response callback. StartResponse = Callable[[str, List[Tuple[str, str]], Any], None] #: The type of middleware functions. Middleware = Callable[[Callable[..., Any]], Callable[..., Any]] #: The request method. Method = NewType("Method", str) #: The request URI scheme. Scheme = NewType("Scheme", str) #: The request hostname. Host = NewType("Host", str) #: The request port. Port = NewType("Port", int) #: The request query string. QueryString = NewType("QueryString", str) #: A query string parameter. QueryParam = NewType("QueryParam", str) #: A header. Header = NewType("Header", str) #: A file-like object representing the request input data. RequestInput = NewType("RequestInput", BinaryIO) #: A bytestring representing the request input data. RequestBody = NewType("RequestBody", bytes) #: Parsed request data. RequestData = NewType("RequestData", Dict[str, Any]) def extract_optional_annotation(annotation: Any) -> Tuple[bool, Any]: """Returns a tuple denoting whether or not the annotation is an Optional type and the inner annotation. """ if is_optional_annotation(annotation): return True, annotation.__args__[0] return False, annotation def is_optional_annotation(annotation: Any) -> bool: """Returns True if the given annotation represents an Optional type. """ try: return getattr(annotation, "__origin__", None) is Union and \ issubclass(annotation.__args__[1], type(None)) except TypeError: # pragma: no cover return False PK!~molten/validation/__init__.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from .common import Missing, is_schema from .field import Field, Validator, field from .schema import dump_schema, load_schema, schema __all__ = [ "Missing", "Field", "Validator", "field", "schema", "dump_schema", "load_schema", "is_schema", ] PK!\Gmolten/validation/common.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import Any, Type class _Missing: """The type of missing values. """ def __repr__(self) -> str: # pragma: no cover return "Missing" #: Canary value representing missing attributes or values. Missing = _Missing() def is_schema(ob: Type[Any]) -> bool: """Returns True if the given type is a schema. """ return isinstance(ob, type) and \ hasattr(ob, "_SCHEMA") and \ hasattr(ob, "_FIELDS") PK!IRBRBmolten/validation/field.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import re from typing import ( Any, Callable, Dict, Generic, List, Optional, Sequence, Type, TypeVar, Union, no_type_check ) from typing_extensions import Protocol from typing_inspect import get_origin, is_generic_type, is_typevar from ..errors import FieldValidationError, ValidationError from ..typing import extract_optional_annotation from .common import Missing, _Missing, is_schema _T = TypeVar("_T") @no_type_check def field(*args, **kwargs) -> Any: """An alias for :class:`.Field` that tricks the type system into submission. """ return Field(*args, **kwargs) class Validator(Protocol[_T]): # pragma: no cover """Validators ensure that values conform to arbitrary specifications. """ def can_validate_field(self, field: "Field[_T]") -> bool: """This should return True if this validator can validate the given Field. """ ... @no_type_check def validate(self, field: "Field[_T]", value: Any, **options: Any) -> _T: """Validate and possibly transform the given value. Raises: FieldValidationError: If the value is not valid. """ ... class Field(Generic[_T]): """An individual field on a schema. The @schema decorator automatically turns annotated attributes into fields, but the field class can also be used to enrich annotated attributes with metadata. Examples: >>> @schema ... class Application: ... name: str ... rating: int = Field(minimum=1, maximum=5) Parameters: name: The name of the field. Automatically populated by the schema decorator. annotation: The field's annotation. Like name, this is automatically populated by @schema. description: An optional description for the field. default: An optional default value for the field. default_factory: An optional default function for the field. request_name: The field's name within a request. This is the same as the field's name by default. response_name: The field's name within a response. This is the same as the field's name by default. request_only: Whether or not to exclude this field from responses. Defaults to False. response_only: Whether or not to ignore this field when loading requests. Defaults to False. allow_coerce: Whether or not values passed to this field may be coerced to the correct type. Defaults to False. validator: The validator to use when loading data. The schema decorator will automatically pick a validator for builtin types. \**validator_options: Arbitrary options passed to the field's validator. """ __slots__ = [ "name", "annotation", "description", "default", "default_factory", "request_name", "response_name", "request_only", "response_only", "allow_coerce", "validator", "validator_options", ] def __init__( self, name: Optional[str] = None, annotation: Optional[Type[_T]] = None, description: Optional[str] = None, default: Union[_T, _Missing] = Missing, default_factory: Optional[Callable[[], _T]] = None, request_name: Optional[str] = None, response_name: Optional[str] = None, request_only: bool = False, response_only: bool = False, allow_coerce: bool = False, validator: Optional[Validator[_T]] = None, **validator_options: Any, ) -> None: self.name = name self.annotation = annotation self.description = description self.default = default self.default_factory = default_factory self.request_name = request_name or name self.response_name = response_name or name self.request_only = request_only self.response_only = response_only self.allow_coerce = allow_coerce self.validator = validator self.validator_options = validator_options def select_validator(self) -> None: """Find a suitable Validator for this field. """ if self.validator is None: self.validator = _select_validator(self) if self.validator_options and not self.validator: raise RuntimeError(f"no validator could be selected for field {self}") @property def has_default(self) -> bool: """Returns True if the field has either a default value or a default factory. """ return self.default is not Missing or self.default_factory is not None @no_type_check def validate(self, value: Optional[Any]) -> _T: """Validate and possibly transform the given value. Raises: FieldValidationError: When the value is not valid. """ is_optional, annotation = extract_optional_annotation(self.annotation) # Distinguishing between missing values and null values is # important. Optional types can have None as a value whereas # types with a default cannot. Additionally, it's possible to # have an optional type without a default value. if value is Missing: if self.default is not Missing: return self.default elif self.default_factory: return self.default_factory() elif is_optional: return None raise FieldValidationError("this field is required") if value is None: if not is_optional: raise FieldValidationError("this field cannot be null") return value if not is_generic_type(annotation) and \ not is_typevar(annotation) and \ not is_schema(annotation) and \ not isinstance(value, annotation): if not self.allow_coerce: raise FieldValidationError(f"unexpected type {type(value).__name__}") try: value = annotation(value) except Exception as e: raise FieldValidationError(f"value could not be coerced to {annotation.__name__}") if self.validator: return self.validator.validate(self, value, **self.validator_options) return value def __repr__(self) -> str: params = ", ".join(f"{name}={repr(getattr(self, name))}" for name in self.__slots__) return f"{type(self).__name__}({params})" class NumberValidator: """Validates numbers. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return annotation is int or annotation is float def validate( self, field: Field[_T], value: Union[int, float], minimum: Optional[Union[int, float]] = None, maximum: Optional[Union[int, float]] = None, multiple_of: Optional[Union[int, float]] = None, ) -> Union[int, float]: if minimum is not None and value < minimum: raise FieldValidationError(f"value must be >= {minimum}") if maximum is not None and value > maximum: raise FieldValidationError(f"value must be <= {maximum}") if multiple_of is not None and value % multiple_of != 0: raise FieldValidationError(f"value must be a multiple of {multiple_of}") return value class StringValidator: """Validates strings. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return annotation is str def validate( self, field: Field[_T], value: str, choices: Optional[Sequence[str]] = None, pattern: Optional[str] = None, min_length: Optional[int] = None, max_length: Optional[int] = None, strip_spaces: bool = False, ) -> str: if choices is not None and value not in choices: raise FieldValidationError(f"must be one of: {', '.join(repr(choice) for choice in choices)}") if pattern is not None and not re.match(pattern, value): raise FieldValidationError(f"must match pattern {pattern!r}") if min_length is not None and len(value) < min_length: raise FieldValidationError(f"length must be >= {min_length}") if max_length is not None and len(value) > max_length: raise FieldValidationError(f"length must be <= {max_length}") if strip_spaces: return value.strip() return value class ListValidator: """Validates lists. When a generic parameter is provided, then the values will be validated against that annotation:: >>> @schema ... class Setting: ... name: str ... value: str >>> @schema ... class Account: ... settings: List[Setting] >>> load_schema(Account, {"settings": [{"name": "a", "value": "b"}]}) Account(settings=[Setting(name="a", value="b")]) >>> load_schema(Account, {"settings": [{"name": "a"}]}) Traceback (most recent call last): ... ValidationError: {"settings": {0: {"value": "this field is required"}}} When a generic parameter isn't provided, then any list is accepted. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return get_origin(annotation) in (list, List) @no_type_check def validate( self, field: Field[_T], value: List[Any], min_items: Optional[int] = None, max_items: Optional[int] = None, item_validator_options: Optional[Dict[str, Any]] = None, ) -> List[Any]: if not isinstance(value, list): raise FieldValidationError("value must be a list") if min_items is not None and len(value) < min_items: raise FieldValidationError(f"length must be >= {min_items}") if max_items is not None and len(value) > max_items: raise FieldValidationError(f"length must be <= {max_items}") # If the argument is Any, then the list can contain anything, # otherwise each item needs to be validated. annotation_args = getattr(field.annotation, "__args__", []) if annotation_args != (Any,): # This is a little piggy but it works well enough in practice. item_validator_options = item_validator_options or {} sub_field = Field(annotation=annotation_args[0], **item_validator_options) sub_field.select_validator() items = [] for i, item in enumerate(value): try: items.append(sub_field.validate(item)) except FieldValidationError as e: raise ValidationError({i: str(e)}) except ValidationError as e: raise ValidationError({i: e.reasons}) return items return value class DictValidator: """Validates dictionaries. When the ``fields`` option is provided, only the declared fields are going to be extracted from the input and will be validated. >>> @schema ... class Account: ... settings: Dict[str, str] = Field(fields={ ... "a": Field(annotation=str), ... }) >>> load_schema(Account, {"settings": {}}) Account(settings={}) >>> load_schema(Account, {"settings": {"a": "b", "c": "d"}}) Account(settings={"settings" {"a": "b"}}) >>> load_schema(Account, {"settings": {"a": 42}}) Traceback (most recent call last): ... ValidationError: {"settings": {"a": "unexpected type int"}} When the ``fields`` option is not provided and the annotation has generic parameters, then the items from the input will be validated against the generic parameter annotations:: >>> @schema ... class Account: ... settings: Dict[str, str] >>> load_schema(Account, {"settings": {}}) Account(settings={}) >>> load_schema(Account, {"settings": {"a": "b"}}) Account(settings={"a": "b"}) >>> load_schema(Account, {"settings": {"a": 42}) # invalid Traceback (most recent call last): ... ValidationError: {"settings": {"a": "unexpected type int"}} When neither ``fields`` or generic parameters are provided, then any dictionary will be accepted. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return get_origin(annotation) in (dict, Dict) @no_type_check def validate( self, field: Field[_T], value: Dict[Any, Any], fields: Optional[Dict[str, Field[Any]]] = None, key_validator_options: Optional[Dict[str, Any]] = None, value_validator_options: Optional[Dict[str, Any]] = None, ) -> Dict[Any, Any]: if not isinstance(value, dict): raise FieldValidationError("value must be a dict") # If a field dictionary was provided then we select specific # items from the input, otherwise we just validate the input. if fields is not None: items = {} for item_name, item_field in fields.items(): try: item_field.select_validator() item_value = value.get(item_name, Missing) items[item_name] = item_field.validate(item_value) except FieldValidationError as e: raise ValidationError({item_name: str(e)}) except ValidationError as e: raise ValidationError({item_name: e.reasons}) return items # If the args are [Any, Any], then the dict can contain # anything, otherwise each item needs to be validated. annotation_args = getattr(field.annotation, "__args__", []) if annotation_args and annotation_args != (Any, Any): key_validator_options = key_validator_options or {} key_field = Field(annotation=annotation_args[0], **key_validator_options) key_field.select_validator() value_validator_options = value_validator_options or {} value_field = Field(annotation=annotation_args[1], **value_validator_options) value_field.select_validator() items = {} for item_name, item_value in value.items(): try: item_name = key_field.validate(item_name) item_value = value_field.validate(item_value) items[item_name] = item_value except FieldValidationError as e: raise ValidationError({item_name: str(e)}) except ValidationError as e: raise ValidationError({item_name: e.reasons}) return items return value class SchemaValidator: """Validates dictionaries against schema classes. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return is_schema(annotation) def validate(self, field: Field[_T], value: Dict[str, Any]) -> Any: from .schema import load_schema _, annotation = extract_optional_annotation(field.annotation) return load_schema(annotation, value) #: The set of built-in validators. Fields will attempt to use one of #: these unless otherwise specified. VALIDATORS: List[Validator[Any]] = [ NumberValidator(), StringValidator(), ListValidator(), DictValidator(), SchemaValidator(), ] def _select_validator(field: Field[_T]) -> Optional[Validator[_T]]: """Find a suitable validator for the given Field. """ for validator in VALIDATORS: if validator.can_validate_field(field): return validator return None PK!- - molten/validation/schema.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import Any, Dict, List, Optional, Type, TypeVar, get_type_hints, no_type_check from ..errors import FieldValidationError, ValidationError from .common import Missing, is_schema from .field import Field _T = TypeVar("_T") def schema(cls: Type[_T]) -> Type[_T]: """Construct a schema from a class. Schemas are plain Python classes with automatically-generated ``__init__``, ``__eq__`` and ``__repr__`` methods. They may be used to validate requests and serialize responses. Examples: >>> @schema ... class Account: ... username: str ... password: str = Field(request_only=True) ... is_admin: bool = Field(response_only=True, default=False) >>> load_schema(Account, {}) Traceback (most recent call last): ... ValidationError: {'username': 'this field is required', 'password': 'this field is required'} >>> load_schema(Account, {"username": "example", "password": "secret"}) Account(username='example', password='secret', is_admin=False) >>> dump_schema(load_schema(Account, {"username": "example", "password": "secret"})) {'username': 'example', 'is_admin': False} Raises: RuntimeError: When the attributes are invalid. """ fields = {} for base in cls.__mro__[-1:0:-1]: base_fields = getattr(base, "_FIELDS", {}) for name, field in base_fields.items(): fields[name] = field annotations = get_type_hints(cls) found_default = False for name, annotation in annotations.items(): value = getattr(cls, name, Missing) if value is Missing: value = fields.get(name, value) if isinstance(value, Field): value.name = name value.annotation = annotation value.request_name = value.request_name or name value.response_name = value.response_name or name fields[name] = value else: fields[name] = Field(name=name, annotation=annotation, default=value) # At this point the field instance has an annotation for sure # so it's safe to select a Validator. field = fields[name] field.select_validator() # Make sure fields without a default don't come after fields # with one. if field.has_default: found_default = True elif found_default: raise RuntimeError("attributes without a default cannot follow ones with a default") # Remove the attribute from the class definition. try: if value is not Missing: delattr(cls, name) except AttributeError: pass if not fields: raise RuntimeError(f"schema {cls.__name__} doesn't have any fields") setattr(cls, "__slots__", list(fields)) setattr(cls, "_SCHEMA", True) setattr(cls, "_FIELDS", fields) _add_init(cls, fields) _add_fn(cls, "__eq__", ["self", "other"], _EQ_FN_BODY) _add_fn(cls, "__repr__", ["self"], _REPR_FN_BODY) return cls @no_type_check def load_schema(schema: Type[_T], data: Dict[str, Any]) -> _T: """Validate the given data dictionary against a schema and instantiate the schema. Raises: ValidationError: When the input data is not valid. Parameters: schema: The schema class to validate the data against. data: Data to validate against and populate the schema with. """ if not is_schema(schema): raise TypeError(f"{schema} is not a schema") errors, params = {}, {} for field in schema._FIELDS.values(): if field.response_only: # Response-only fields without an explicit default have to # default to _something_ so we choose None. if not field.has_default: params[field.name] = None continue try: value = data.get(field.request_name, Missing) params[field.name] = field.validate(value) except FieldValidationError as e: errors[field.request_name] = str(e) except ValidationError as e: errors[field.request_name] = e.reasons if errors: raise ValidationError(errors) return schema(**params) @no_type_check def dump_schema(ob: Any, *, sparse: bool = False) -> Dict[str, Any]: """Convert a schema instance into a dictionary. Raises: TypeError: If ob is not a schema instance. Parameters: ob: An instance of a schema. sparse: If true, fields whose values are None are going to be dropped from the output. """ if not is_schema(type(ob)): raise TypeError(f"{ob} is not a schema") data = {} for field in ob._FIELDS.values(): if field.request_only: continue value = getattr(ob, field.name) if is_schema(type(value)): value = dump_schema(value, sparse=sparse) elif isinstance(value, list): value = [dump_schema(item, sparse=sparse) if is_schema(type(item)) else item for item in value] elif isinstance(value, dict): value = {name: dump_schema(item, sparse=sparse) if is_schema(type(item)) else item for name, item in value.items()} if sparse and value is None: continue data[field.response_name] = value return data def _add_fn( cls: Type[Any], name: str, params: List[str], body: List[str], fn_globals: Optional[Dict[str, Any]] = None, fn_locals: Optional[Dict[str, Any]] = None, ) -> None: """Construct a function and add it to a class. """ if name in cls.__dict__: return fn_globals = {"Missing": Missing, **(fn_globals or {})} fn_locals = fn_locals or {} definition = _FN_TEMPLATE.format( name=name, params=", ".join(params), body="\n ".join(body), ) exec(definition, fn_globals, fn_locals) setattr(cls, name, fn_locals[name]) def _add_init(cls: Type[Any], fields: Dict[str, Field[_T]]) -> None: """Construct and add an init function to a schema. """ fn_globals: Dict[str, Any] = {} fn_params = ["self"] fn_body = [] for field in fields.values(): if field.default is not Missing: default_name = f"_{field.name}_default" fn_globals[default_name] = field.default fn_params.append(f"{field.name}=Missing") fn_body.append(f"self.{field.name} = {field.name} if {field.name} is not Missing else {default_name}") elif field.default_factory: factory_name = f"_{field.name}_default_factory" fn_globals[factory_name] = field.default_factory fn_params.append(f"{field.name}=Missing") fn_body.append(f"self.{field.name} = {field.name} if {field.name} is not Missing else {factory_name}()") else: fn_params.append(f"{field.name}") fn_body.append(f"self.{field.name} = {field.name}") _add_fn(cls, "__init__", fn_params, fn_body, fn_globals) _FN_TEMPLATE = """\ def {name}({params}): {body} """.rstrip() _EQ_FN_BODY = """\ try: return all(getattr(self, name) == getattr(other, name) for name in self._FIELDS) except AttributeError: return False """.rstrip().split("\n") _REPR_FN_BODY = """\ params = ', '.join(f'{name}={repr(getattr(self, name))}' for name in self._FIELDS) return f'{type(self).__name__}({params})' """.rstrip().split("\n") PK!HRSTmolten-0.5.2.dist-info/WHEEL A н#J;/"d&F]xzw>@Zpy3F ]n2H%_60{8&baPa>PK!Hamolten-0.5.2.dist-info/METADATATYO@~_1-"(j ) amOU^wmBO;N$‰;+uу=ˮD=ȵrX՟.0@. #Lu J;9C8^Đ\)guA)RȜ+m/ 5&9ȘT +v]MNus!UχX0nQ"// ` (q|:oxVN$쩦H==״mx~bƻugҺP; ;Z@– |s5 mldH(4]ELriD ƶg[^FN8o;ˑ9#fڤa3~EhZGlU~ˬY5-I|͂VgQ}N% !,LJlũDǁ "#̥ekkLs6P@S(M|M-ViOǰX&,o;LxL~Vtn?yDx FuBCsz&Ul{. R51%& xJagOA{.C; 44HWm2I[QfP$ [b^&8j1TJe,Tzb\ -|4:kt${\*#{to0R: snWra~TaId>֘z PK!H6Z]molten-0.5.2.dist-info/RECORDuǖXE- o=@2LX+y̬+"9'uуoK_ufKL8+ "gWuNJ`!!EпyA~,δ0(Z'n-v+*J4R^EPº, a< zG K6Q'zDƾo<}[p@M]`EAh?S GoK̔u-}!v we<$y&ᖴ h՘2)C8C_e7~aC EJ,(tAM/>A_M[OZ`qh/*j~!s0mΑWUX\C81_-x4zޏSAMэ?/'rPfWs{rh<+jaYͧWC/jPE -(29;S+Szv YJ-W8=V(5Xжu^`( /~4n%a\"љ8t9]Zk f8eDVM) M}`HkP au FG%5y^_g,R+IP?xa]lhH{AO*/|kO8A=❢eC?p n`Z7hd.*1vV#y v wFiGݫGjXĤA5P\ `?'դ7 ʎO66+;Xc3O9p-;Vٵ~\OsF)s̹ě" uǫ>ʀ XSܔ+:*EgE-“=shz{{{@ib$KRDg!K]U2v|6GeEa\P(X qGNWe8 `7 IdXo'e3kpmM8-e>ӭ6 э&];WdTCeⳟWۺl0:r}6 L!nk MjkCMS,y.㈺[ tLGS]}_ڡd;g& &NruxrDBn!9JW+֢yw#4 n-wW8ř 6;vwah9&%l-Ȩ \bcTGC(/C6˯~i@+YM7435>ˆ](=ch?kJF#.Dž;LGdyh팃X{S޶Jٶzо? I+]5{w1$~ڶ< ~؜#R[{80*6'7m~Q?#/^7#=\1.!b".$aۂ#*iHaG8Nh쳔" a:v9탻_6̱a?!7Jm|ӶWpmxo &$ķuoNOrGjaN2oG'2i> ~]O[w 8քҹLr*,[=B27}`!1grLV)&o7O>RPb |c6bty*5ZKv -B)f~`voNB] !&m/'s)#5;F[<1h1g/+JNGUu'X<7ySLe %q,'WC/Cs6z{g:N52\Jč#fb-1Lfm7%𺵇 PK!AEmolten/__init__.pyPK!("" 5molten/app.pyPK!B f2molten/common.pyPK!5F@molten/components.pyPK!& ##([molten/contrib/dramatiq.pyPK! BIm m jmolten/contrib/msgpack.pyPK!I8 8 'tmolten/contrib/prometheus.pyPK!nP P molten/contrib/request_id.pyPK!}#molten/contrib/sessions.pyPK!hAcmolten/contrib/sqlalchemy.pyPK!:u-molten/contrib/templates.pyPK!u molten/contrib/toml_settings.pyPK!Maa"molten/contrib/websockets.pyPK!,  70molten/dependency_injection.pyPK!7ї Jmolten/errors.pyPK!5m3XWmolten/helpers.pyPK!,m`molten/http/__init__.pyPK!ϰzgmolten/http/cookies.pyPK!!xmolten/http/headers.pyPK!;"   molten/http/query_params.pyPK!  Pmolten/http/request.pyPK!FFomolten/http/response.pyPK! molten/http/status_codes.pyPK!!!molten/http/uploaded_file.pyPK!1i  molten/middleware.pyPK!Sk\\4molten/openapi/__init__.pyPK!㔰BBmolten/openapi/documents.pyPK!>i) molten/openapi/handlers.pyPK!$%molten/openapi/templates/__init__.pyPK!lfhh#%molten/openapi/templates/index.htmlPK!ÌJW*W*+molten/parsers.pyPK!qU4'Vmolten/py.typedPK! dYmolten/renderers.pyPK!o?qbmolten/router.pyPK!@ @ molten/settings.pyPK!YUeemolten/testing/__init__.pyPK!}Xmolten/testing/client.pyPK!جj;molten/testing/common.pyPK! molten/typing.pyPK!~^molten/validation/__init__.pyPK!\Gwmolten/validation/common.pyPK!IRBRBWmolten/validation/field.pyPK!- - molten/validation/schema.pyPK!HRSTG'molten-0.5.2.dist-info/WHEELPK!Ha'molten-0.5.2.dist-info/METADATAPK!H6Z]*molten-0.5.2.dist-info/RECORDPK.. u3