PK!c,66molten/__init__.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from .app import App, BaseApp from .dependency_injection import Component, DependencyInjector, DependencyResolver from .errors import ( DIError, FieldTooLarge, FieldValidationError, FileTooLarge, HeaderMissing, HTTPError, MoltenError, ParamMissing, ParseError, RequestParserNotAvailable, RouteNotFound, RouteParamMissing, TooManyFields, ValidationError ) from .http import Cookie, Cookies, Headers, QueryParams, Request, Response, UploadedFile from .http.status_codes import * from .middleware import ResponseRendererMiddleware from .parsers import JSONParser, MultiPartParser, RequestParser, URLEncodingParser from .renderers import JSONRenderer, ResponseRenderer from .router import Include, Route, Router from .testing import TestClient, to_environ from .typing import ( Header, Host, Method, Middleware, Port, QueryParam, QueryString, RequestBody, RequestData, RequestInput, Scheme ) from .validation import Field, Missing, dump_schema, is_schema, load_schema, schema __version__ = "0.0.1" __all__ = [ "BaseApp", "App", "Middleware", # Router "Router", "Route", "Include", # HTTP "Method", "Scheme", "Host", "Port", "QueryString", "QueryParams", "QueryParam", "Headers", "Header", "RequestInput", "RequestBody", "RequestData", "Cookies", "Cookie", "UploadedFile", "Request", "Response", # Dependency-injection "DependencyInjector", "DependencyResolver", "Component", # Parsers "RequestParser", "JSONParser", "URLEncodingParser", "MultiPartParser", # Renderers "ResponseRenderer", "JSONRenderer", # Middleware "ResponseRendererMiddleware", # Validation "Field", "Missing", "schema", "is_schema", "dump_schema", "load_schema", # Errors "MoltenError", "DIError", "HTTPError", "RouteNotFound", "RouteParamMissing", "RequestParserNotAvailable", "ParseError", "FieldTooLarge", "FileTooLarge", "TooManyFields", "HeaderMissing", "ParamMissing", "ValidationError", "FieldValidationError", # Testing "TestClient", "to_environ", # Status codes # 1xx "HTTP_100", "HTTP_101", "HTTP_102", # 2xx "HTTP_200", "HTTP_201", "HTTP_202", "HTTP_203", "HTTP_204", "HTTP_205", "HTTP_206", "HTTP_207", "HTTP_208", # 3xx "HTTP_300", "HTTP_301", "HTTP_302", "HTTP_303", "HTTP_304", "HTTP_305", "HTTP_307", "HTTP_308", # 4xx "HTTP_400", "HTTP_401", "HTTP_402", "HTTP_403", "HTTP_404", "HTTP_405", "HTTP_406", "HTTP_407", "HTTP_408", "HTTP_409", "HTTP_410", "HTTP_411", "HTTP_412", "HTTP_413", "HTTP_414", "HTTP_415", "HTTP_416", "HTTP_417", "HTTP_418", "HTTP_421", "HTTP_422", "HTTP_423", "HTTP_424", "HTTP_426", "HTTP_428", "HTTP_429", "HTTP_431", "HTTP_444", "HTTP_451", "HTTP_499", # 5xx "HTTP_500", "HTTP_501", "HTTP_502", "HTTP_503", "HTTP_504", "HTTP_505", "HTTP_506", "HTTP_507", "HTTP_508", "HTTP_510", "HTTP_511", "HTTP_599", ] PK!L molten/app.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import logging import sys from typing import Any, Callable, Iterable, List, Optional from wsgiref.util import FileWrapper # type: ignore from .components import ( CookiesComponent, HeaderComponent, QueryParamComponent, RequestBodyComponent, RequestDataComponent, RouteParamsComponent, SchemaComponent ) from .dependency_injection import Component, DependencyInjector from .errors import RequestParserNotAvailable from .http import HTTP_204, HTTP_404, HTTP_415, HTTP_500, Headers, QueryParams, Request, Response from .middleware import ResponseRendererMiddleware from .parsers import JSONParser, MultiPartParser, RequestParser, URLEncodingParser from .renderers import JSONRenderer, ResponseRenderer from .router import RouteLike, Router from .typing import ( Environ, Host, Method, Middleware, Port, QueryString, RequestInput, Scheme, StartResponse ) LOGGER = logging.getLogger(__name__) class BaseApp: """Base class for App implementations. """ def __init__( self, routes: Optional[List[RouteLike]] = None, middleware: Optional[List[Middleware]] = None, components: Optional[List[Component[Any]]] = None, parsers: Optional[List[RequestParser]] = None, renderers: Optional[List[ResponseRenderer]] = None, ) -> None: self.router = Router(routes) self.add_route = self.router.add_route self.add_routes = self.router.add_routes self.reverse_uri = self.router.reverse_uri self.parsers = parsers or [ JSONParser(), URLEncodingParser(), MultiPartParser(), ] self.renderers = renderers or [ JSONRenderer(), ] self.middleware = middleware or [ ResponseRendererMiddleware(self.renderers) ] self.components = (components or []) + [ HeaderComponent(), CookiesComponent(), QueryParamComponent(), RequestBodyComponent(), RequestDataComponent(self.parsers), SchemaComponent(), ] self.injector = DependencyInjector(self.components) def handle_404(self) -> Response: """Called whenever a route cannot be found. """ return Response(HTTP_404, content="Not Found") def handle_415(self) -> Response: """Called whenever a request comes in with an unsupported content type. """ return Response(HTTP_415, content="Unsupported Media Type") def handle_exception(self, exception: BaseException) -> Response: """Called whenever an unhandled exception occurs in middleware or a handler. Dependencies are injected into this just like a normal handler. """ LOGGER.exception("An unhandled exception occurred.") return Response(HTTP_500, content="Internal Server Error") def __call__(self, environ: Environ, start_response: StartResponse) -> Iterable[bytes]: # pragma: no cover raise NotImplementedError("apps must implement '__call__'") class App(BaseApp): """An application that implements the WSGI interface. """ def __call__(self, environ: Environ, start_response: StartResponse) -> Iterable[bytes]: request = Request.from_environ(environ) resolver = self.injector.get_resolver({ Request: request, Method: Method(request.method), Scheme: Scheme(request.scheme), Host: Host(request.host), Port: Port(request.port), QueryString: QueryString(environ.get("QUERY_STRING", "")), QueryParams: request.params, Headers: request.headers, RequestInput: RequestInput(request.body_file), }) try: handler: Callable[..., Any] route_and_params = self.router.match(request.method, request.path) if route_and_params is not None: route, params = route_and_params handler = route.handler resolver.add_component(RouteParamsComponent(params)) else: handler = self.handle_404 handler = resolver.resolve(handler) for middleware in reversed(self.middleware): handler = resolver.resolve(middleware(handler)) exc_info = None response = handler() except RequestParserNotAvailable: exc_info = None response = resolver.resolve(self.handle_415)() except Exception as e: exc_info = sys.exc_info() response = resolver.resolve(self.handle_exception)(exception=e) content_length = response.get_content_length() if content_length is not None: response.headers.add("content-length", str(content_length)) start_response(response.status, list(response.headers), exc_info) if response.status != HTTP_204: wrapper = environ.get("wsgi.file_wrapper", FileWrapper) return wrapper(response.stream) else: return [] PK!.> molten/common.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from collections import defaultdict from typing import Dict, Iterable, Iterator, List, Optional, Tuple, TypeVar, Union KT = TypeVar("KT") VT = TypeVar("VT") Mapping = Union[ Dict[KT, Union[VT, List[VT]]], Iterable[Tuple[KT, Union[VT, List[VT]]]] ] class MultiDict(Iterable[Tuple[KT, VT]]): """A mapping from param names to lists of values. Once constructed, these instances cannot be modified. """ __slots__ = ["_data"] def __init__(self, mapping: Optional[Mapping[KT, VT]] = None) -> None: self._data: Dict[KT, List[VT]] = defaultdict(list) self._add_all(mapping or {}) def _add(self, name: KT, value: Union[VT, List[VT]]) -> None: """Add values for a particular key. """ if isinstance(value, list): self._data[name].extend(value) else: self._data[name].append(value) def _add_all(self, mapping: Mapping[KT, VT]) -> None: """Add a group of values. """ items: Iterable[Tuple[KT, Union[VT, List[VT]]]] if isinstance(mapping, dict): items = mapping.items() else: items = mapping for name, value_or_values in items: self._add(name, value_or_values) def get(self, name: KT, default: Optional[VT] = None) -> Optional[VT]: """Get the last value for a given key. """ try: return self[name] except KeyError: return default def get_all(self, name: KT) -> List[VT]: """Get all the values for a given key. """ return self._data[name] def __getitem__(self, name: KT) -> VT: """Get the last value for a given key. Raises: KeyError: When the key is missing. """ try: return self._data[name][-1] except IndexError: raise KeyError(name) def __iter__(self) -> Iterator[Tuple[KT, VT]]: """Iterate over all the parameters. """ for name, values in self._data.items(): for value in values: yield name, value def __repr__(self) -> str: mapping = ", ".join(f"{repr(name)}: {repr(value)}" for name, value in self._data.items()) return f"{type(self).__name__}({{{mapping}}})" PK!3D jjmolten/components.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from inspect import Parameter from typing import Any, Dict, List, Optional, TypeVar from .dependency_injection import DependencyResolver from .errors import ( HeaderMissing, HTTPError, ParamMissing, RequestParserNotAvailable, ValidationError ) from .http import HTTP_400, Cookies, Headers, QueryParams from .parsers import RequestParser from .typing import ( Header, QueryParam, RequestBody, RequestData, RequestInput, extract_optional_annotation ) from .validation import is_schema, load_schema _T = TypeVar("_T") class HeaderComponent: """Retrieves a named header from the request. Examples: def handle(content_type: Header) -> Response: ... def handle(content_type: Optional[Header]) -> Response: ... """ is_cacheable = False is_singleton = False def can_handle_parameter(self, parameter: Parameter) -> bool: _, annotation = extract_optional_annotation(parameter.annotation) return annotation is Header def resolve(self, parameter: Parameter, headers: Headers) -> Optional[str]: is_optional, _ = extract_optional_annotation(parameter.annotation) header_name = parameter.name.replace("_", "-") try: return headers[header_name] except HeaderMissing: if is_optional: return None raise HTTPError(HTTP_400, {header_name: "missing"}) class QueryParamComponent: """Retrieves a named query param from the request. Examples: def handle(x: QueryParam) -> Response: ... def handle(x: Optional[QueryParam]) -> Response: ... """ is_cacheable = False is_singleton = False def can_handle_parameter(self, parameter: Parameter) -> bool: _, annotation = extract_optional_annotation(parameter.annotation) return annotation is QueryParam def resolve(self, parameter: Parameter, params: QueryParams) -> Optional[str]: is_optional, _ = extract_optional_annotation(parameter.annotation) try: return params[parameter.name] except ParamMissing: if is_optional: return None raise HTTPError(HTTP_400, {parameter.name: "missing"}) class RequestBodyComponent: """A component that reads the entire request body into a string. Examples: def handle(body: RequestBody) -> Response: ... """ is_cacheable = True is_singleton = False def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is RequestBody def resolve(self, content_length: Header, body_file: RequestInput) -> RequestBody: return RequestBody(body_file.read(int(content_length))) class RequestDataComponent: """A component that parses request data based on the content-type header and the set of registered request parsers. If no request parser is available, then an HTTP 415 response is returned. Examples: def handle(data: RequestData) -> Response: ... """ __slots__ = ["parsers"] is_cacheable = True is_singleton = False def __init__(self, parsers: List[RequestParser]) -> None: self.parsers = parsers def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is RequestData def resolve(self, content_type: Optional[Header], resolver: DependencyResolver) -> RequestData: content_type_str = (content_type or "").lower() for parser in self.parsers: if parser.can_parse_content(content_type_str): return RequestData(resolver.resolve(parser.parse)()) raise RequestParserNotAvailable(content_type_str) class CookiesComponent: """A component that parses request cookies. Examples: def handle(cookies: Cookies) -> Response: cookies["some-cookie"] """ is_cacheable = True is_singleton = False def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is Cookies def resolve(self, cookie: Optional[Header]) -> Cookies: if cookie is None: return Cookies() return Cookies.parse(cookie) class RouteParamsComponent: """A component that resolves route params. Examples: def handle(name: str, age: int) -> Response: ... app.add_route(Route("/{name}/{age}", handle)) """ __slots__ = ["params"] is_cacheable = False is_singleton = False def __init__(self, params: Dict[str, str]) -> None: self.params = params def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.name in self.params def resolve(self, parameter: Parameter) -> Any: try: return parameter.annotation(self.params[parameter.name]) except (TypeError, ValueError): raise HTTPError(HTTP_400, {parameter.name: f"expected {parameter.annotation.__name__} value"}) class SchemaComponent: """A component that validates request data according to a schema. """ is_cacheable = False is_singleton = False def can_handle_parameter(self, parameter: Parameter) -> bool: return is_schema(parameter.annotation) def resolve(self, parameter: Parameter, data: RequestData) -> Any: try: return load_schema(parameter.annotation, data) except ValidationError as e: raise HTTPError(HTTP_400, {"errors": e.reasons}) PK!fPmolten/contrib/settings.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import os from inspect import Parameter from typing import Any, Optional try: import toml except ImportError: # pragma: no cover raise ImportError("'toml' package missing. Run 'pip install toml'.") #: Canary value representing missing values. Missing = object() class Settings(dict): """A dictionary of settings parsed from a TOML file. """ def deep_get(self, path: str, default: Optional[Any] = None) -> Optional[Any]: """Look up a deeply-nested setting by its path. Raises: TypeError: When attempting to index into a primitive value or when indexing a list with a string value rather than an integer. Parameters: path: A dot-separated string representing the path to the value. default: The value to return if the path cannot be traversed. """ root = self names = path.split(".") for name in names: if isinstance(root, list): try: root = root[int(name)] except (IndexError, ValueError): raise TypeError(f"invalid index '{name}' for list {root!r}") elif isinstance(root, dict): root = root.get(name, Missing) else: raise TypeError(f"value {root!r} at subpath '{name}' is not a list or a dict") if root is Missing: return default return root @classmethod def from_path(cls, path: str, environment: str) -> "Settings": """Load a TOML file into a dictionary. Raises: FileNotFoundError: When the settings file does not exist. """ all_settings = toml.load(open(path)) common_settings = all_settings.get("common", {}) environment_settings = all_settings.get(environment, {}) return Settings({**common_settings, **environment_settings}) class SettingsComponent: """A component that loads settings from a TOML file. The settings file should have a "common" section and one section for each environment the application is expected to run in. The environment-specific settings are merged on top of the "common" settings on load. The current environment is determined by the ``ENVIRONMENT`` config variable. Example settings file:: [common] conn_pooling = true conn_pool_size = 1 [dev] [prod] con_pool_size = 32 """ __slots__ = ["path", "environment"] is_cacheable = True is_singleton = True def __init__(self, path: str = "./settings.toml", environment: Optional[str] = None) -> None: self.path = path self.environment = environment or os.getenv("ENVIRONMENT", "dev") def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is Settings def resolve(self) -> Settings: return Settings.from_path(self.path, self.environment) PK!O[bmolten/contrib/templates.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from inspect import Parameter from molten import HTTP_200, Response try: import jinja2 except ImportError: # pragma: no cover raise ImportError("'jinja2' missing. Run 'pip install jinja2'.") class Templates: """Renders jinja2 templates. """ __slots__ = ["environment"] def __init__(self, path: str) -> None: self.environment = jinja2.Environment( loader=jinja2.FileSystemLoader(path), ) def render(self, template_name: str, **context) -> Response: template = self.environment.get_template(template_name) rendered_template = template.render(**context) return Response(HTTP_200, content=rendered_template, headers={ "content-type": "text/html", }) class TemplatesComponent: """A component that builds a jinja2 template renderer. """ __slots__ = ["path"] is_cacheable = True is_singleton = True def __init__(self, path: str) -> None: self.path = path def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is Templates def resolve(self) -> Templates: return Templates(self.path) PK!Űumolten/dependency_injection.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import functools import inspect from inspect import Parameter from typing import Any, Callable, Dict, List, Optional, TypeVar, no_type_check from typing_extensions import Protocol from .errors import DIError _T = TypeVar("_T", covariant=True) class Component(Protocol[_T]): # pragma: no cover """The component protocol. """ @property def is_cacheable(self) -> bool: """If True, then the component will be cached within a resolver. This should be True for most components. Defaults to True. """ @property def is_singleton(self) -> bool: """If True, then the component will be treated as a singleton and cached after its first use. Defaults to False. """ def can_handle_parameter(self, parameter: Parameter) -> bool: """Returns True when parameter represents the desired component. """ @no_type_check def resolve(self) -> _T: """Returns an instance of the component. """ class DependencyInjector: """The dependency injector maintains component state and instantiates the resolver. """ __slots__ = [ "components", "singletons", ] components: List[Component[Any]] singletons: Dict[Component[Any], Any] def __init__(self, components: List[Component[Any]]) -> None: self.components = components or [] self.singletons = {} for component in components: if getattr(component, "is_singleton", False) and component not in self.singletons: resolver = self.get_resolver() resolved_component = resolver.resolve(component.resolve) self.singletons[component] = resolved_component() def get_resolver(self, instances: Optional[Dict[Any, Any]] = None) -> "DependencyResolver": """Get the resolver for this Injector. """ return DependencyResolver( self.components, {**self.singletons, **(instances or {})}, ) class DependencyResolver: """The resolver does the work of actually filling in all of a function's dependencies. """ __slots__ = [ "components", "instances", ] def __init__(self, components: List[Component[Any]], instances: Dict[Component[Any], Any]) -> None: self.components = components[:] self.instances = instances def add_component(self, component: Component[Any]) -> None: """Add a component to this resolver without adding it to the base dependency injector. This is useful for runtime-built components like RouteParamsComponent. """ self.components.append(component) def resolve( self, fn: Callable[..., Any], resolving_parameter: Optional[Parameter] = None, ) -> Callable[..., Any]: """Resolve a function's dependencies. """ @functools.wraps(fn) def resolved_fn(**params: Any) -> Any: signature = inspect.signature(fn) for parameter in signature.parameters.values(): if parameter.name in params: continue # When Parameter is requested then we assume that the # caller actually wants the parameter that resolved the # current component that's being resolved. See QueryParam # or Header components for an example. if parameter.annotation is Parameter: params[parameter.name] = resolving_parameter continue # When a DependencyResolver is requested, then we assume # the caller wants this instance. if parameter.annotation is DependencyResolver: params[parameter.name] = self continue # If our instances contains an exact match for a type, # then we return that. This is used to inject the current # Request object among other things. try: params[parameter.name] = self.instances[parameter.annotation] continue except KeyError: pass for component in self.components: if component.can_handle_parameter(parameter): try: params[parameter.name] = self.instances[component] except KeyError: factory = self.resolve(component.resolve, resolving_parameter=parameter) params[parameter.name] = instance = factory() if getattr(component, "is_cacheable", True): self.instances[component] = instance break else: raise DIError(f"cannot resolve parameter {parameter} of function {fn}") return fn(**params) return resolved_fn PK!r;{ { molten/errors.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import Any, Dict class MoltenError(Exception): """Base class for all Molten exceptions. """ __slots__ = ["status"] def __init__(self, message: str) -> None: self.message = message def __str__(self) -> str: return self.message class DIError(MoltenError): """Raised when a dependency cannot be resolved. """ class HTTPError(MoltenError): """Base class for HTTP errors. Handlers and middleware can raise these to short-circuit execution. """ __slots__ = ["status", "response"] def __init__(self, status: str, response: Any) -> None: self.status = status self.response = response def __str__(self) -> str: # pragma: no cover return self.status class RouteNotFound(MoltenError): """Raised when trying to reverse route to a route that doesn't exist. """ class RouteParamMissing(MoltenError): """Raised when a param is missing while reversing a route. """ class RequestParserNotAvailable(MoltenError): """Raised when no request parser can handle the incoming request. """ class ParseError(MoltenError): """Raised by parsers when the input data cannot be parsed. """ class FieldTooLarge(ParseError): """Raised by MultiPartParser when a field exceeds the maximum field size limit. """ class FileTooLarge(ParseError): """Raised by MultiPartParser when a file exceeds the maximum file size limit. """ class TooManyFields(ParseError): """Raised by MultiPartParser when the input contains too many fields. """ class HeaderMissing(MoltenError): """Raised by Headers.__getitem__ when a header does not exist. """ class ParamMissing(MoltenError): """Raised by QueryParams.__getitem__ when a param is missing. """ class ValidationError(MoltenError): """Raised by validator.load when the input data is invalid. """ def __init__(self, reasons: Dict[str, Any]) -> None: self.reasons = reasons def __str__(self) -> str: # pragma: no cover return str(self.reasons) class FieldValidationError(MoltenError): """Raised by Field.validate when a given value is invalid. """ PK!fժByymolten/http/__init__.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from .cookies import Cookie, Cookies from .headers import Headers from .query_params import QueryParams from .request import Request from .response import Response from .status_codes import * from .uploaded_file import UploadedFile __all__ = [ "Request", "Response", "Headers", "QueryParams", "Cookies", "Cookie", "UploadedFile", # 1xx "HTTP_100", "HTTP_101", "HTTP_102", # 2xx "HTTP_200", "HTTP_201", "HTTP_202", "HTTP_203", "HTTP_204", "HTTP_205", "HTTP_206", "HTTP_207", "HTTP_208", # 3xx "HTTP_300", "HTTP_301", "HTTP_302", "HTTP_303", "HTTP_304", "HTTP_305", "HTTP_307", "HTTP_308", # 4xx "HTTP_400", "HTTP_401", "HTTP_402", "HTTP_403", "HTTP_404", "HTTP_405", "HTTP_406", "HTTP_407", "HTTP_408", "HTTP_409", "HTTP_410", "HTTP_411", "HTTP_412", "HTTP_413", "HTTP_414", "HTTP_415", "HTTP_416", "HTTP_417", "HTTP_418", "HTTP_421", "HTTP_422", "HTTP_423", "HTTP_424", "HTTP_426", "HTTP_428", "HTTP_429", "HTTP_431", "HTTP_444", "HTTP_451", "HTTP_499", # 5xx "HTTP_500", "HTTP_501", "HTTP_502", "HTTP_503", "HTTP_504", "HTTP_505", "HTTP_506", "HTTP_507", "HTTP_508", "HTTP_510", "HTTP_511", "HTTP_599", ] PK!vmolten/http/cookies.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from datetime import datetime, timedelta from typing import Dict, Optional, Union from urllib.parse import parse_qsl, urlencode class Cookies(Dict[str, str]): """A set of request cookies. """ @classmethod def parse(cls, cookie_header: str) -> "Cookies": """Turn a cookie header into a Cookies instance. """ cookies = cls() cookie_strings = cookie_header.split(";") for cookie in cookie_strings: for name, value in parse_qsl(cookie.lstrip()): cookies[name] = value return cookies class Cookie: """An individual response cookie. Raises: ValueError: If the value of same_site is not 'strict' or 'lax'. """ __slots__ = [ "name", "value", "max_age", "expires", "domain", "path", "secure", "http_only", "same_site", ] def __init__( self, name: str, value: str, max_age: Optional[Union[int, float, timedelta]] = None, expires: Optional[Union[int, float, datetime]] = None, domain: Optional[str] = None, path: Optional[str] = None, secure: bool = False, http_only: bool = False, same_site: Optional[str] = None, ) -> None: self.name = name self.value = value self.max_age = max_age self.expires = expires self.domain = domain self.path = path self.secure = secure self.http_only = http_only self.same_site = same_site if same_site and same_site != "strict" and same_site != "lax": raise ValueError("same_site must be either 'strict' or 'lax' or None") def encode(self) -> str: """Convert this cookie to a set-cookie header-compatible string. """ output = [urlencode({self.name: self.value})] if self.max_age is not None: if isinstance(self.max_age, timedelta): duration = int(self.max_age.total_seconds()) else: duration = int(self.max_age) output.append(f"Max-Age={duration}") if self.expires is not None: if isinstance(self.expires, (int, float)): expiration_date = datetime.utcfromtimestamp(self.expires) else: expiration_date = self.expires output.append(f"Expires={_format_cookie_date(expiration_date)}") if self.domain is not None: output.append(f"Domain={self.domain}") if self.path is not None: output.append(f"Path={self.path}") if self.secure: output.append("Secure") if self.http_only: output.append("HttpOnly") if self.same_site is not None: output.append(f"SameSite={self.same_site.title()}") return "; ".join(output) _COOKIE_DATE_DAYS = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] _COOKIE_DATE_MONTHS = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] def _format_cookie_date(date: datetime) -> str: """Formats a cookie expiration date according to [1]. [1]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Date """ tm_year, tm_mon, tm_mday, tm_hour, tm_min, tm_sec, tm_wday, *_ = date.utctimetuple() day = _COOKIE_DATE_DAYS[tm_wday] month = _COOKIE_DATE_MONTHS[tm_mon - 1] return f"{day}, {tm_mday:02d}-{month}-{tm_year} {tm_hour:02d}:{tm_min:02d}:{tm_sec:02d} GMT" PK!){molten/http/headers.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from collections import defaultdict from typing import Dict, Iterable, Iterator, List, Optional, Tuple, Union from ..errors import HeaderMissing from ..typing import Environ #: An alias representing a dictionary of headers. HeadersDict = Dict[str, Union[str, List[str]]] #: WSGI keeps these separate from other headers. CONTENT_VARS = {"CONTENT_LENGTH", "CONTENT_TYPE"} class Headers(Iterable[Tuple[str, str]]): """A mapping from case-insensitive header names to lists of values. """ __slots__ = ["_headers"] def __init__(self, mapping: Optional[HeadersDict] = None) -> None: self._headers: Dict[str, List[str]] = defaultdict(list) self.add_all(mapping or {}) @classmethod def from_environ(cls, environ: Environ) -> "Headers": """Construct a Headers instance from a WSGI environ. """ headers = {} for name, value in environ.items(): if name in CONTENT_VARS: headers[name.replace("_", "-")] = value elif name.startswith("HTTP_"): headers[_parse_environ_header(name)] = [value] return cls(headers) def add(self, header: str, value: Union[str, List[str]]) -> None: """Add values for a particular header. """ if isinstance(value, list): self._headers[header.lower()].extend(value) else: self._headers[header.lower()].append(value) def add_all(self, mapping: HeadersDict) -> None: """Add a group of headers. """ for header, value_or_values in mapping.items(): self.add(header, value_or_values) def get(self, header: str, default: Optional[str] = None) -> Optional[str]: """Get the last value for a given header. """ try: return self[header] except HeaderMissing: return default def get_all(self, header: str) -> List[str]: """Get all the values for a given header. """ return self._headers[header.lower()] def get_int(self, header: str, default: Optional[int] = None) -> Optional[int]: """Get the last value for a given header as an integer. """ try: return int(self[header]) except HeaderMissing: return default def __delitem__(self, header: str) -> None: """Delete all the values for a given header. """ del self._headers[header.lower()] def __getitem__(self, header: str) -> str: """Get the last value for a given header. Raises: HeaderMissing: When the header is missing. """ try: return self._headers[header.lower()][-1] except IndexError: raise HeaderMissing(header) def __setitem__(self, header: str, value: str) -> None: """Replace a header's values. """ self._headers[header.lower()] = [value] def __iter__(self) -> Iterator[Tuple[str, str]]: """Iterate over all the headers. """ for header, values in self._headers.items(): for value in values: yield header, value def __repr__(self) -> str: mapping = ", ".join(f"{repr(name)}: {repr(value)}" for name, value in self._headers.items()) return f"Headers({{{mapping}}})" #: The number of characters that are stripped from the beginning of #: every header name in a WSGI environ. HEADER_PREFIX_LEN = len("HTTP_") def _parse_environ_header(header: str) -> str: return header[HEADER_PREFIX_LEN:].replace("_", "-") PK!;"  molten/http/query_params.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import Dict, List, Optional, Union from urllib.parse import parse_qsl from ..common import MultiDict from ..errors import ParamMissing from ..typing import Environ ParamsDict = Dict[str, Union[str, List[str]]] class QueryParams(MultiDict[str, str]): """A mapping from param names to lists of values. Once constructed, these instances cannot be modified. """ @classmethod def from_environ(cls, environ: Environ) -> "QueryParams": """Construct a QueryParams instance from a WSGI environ. """ return cls.parse(environ.get("QUERY_STRING", "")) @classmethod def parse(cls, query_string: str) -> "QueryParams": """Construct a QueryParams instance from a query string. """ return cls(parse_qsl(query_string)) def get(self, name: str, default: Optional[str] = None) -> Optional[str]: """Get the last value for a given key. """ try: return self[name] except ParamMissing: return default def __getitem__(self, name: str) -> str: """Get the last value for a given key. Raises: ParamMissing: When the key is missing. """ try: return self._data[name][-1] except IndexError: raise ParamMissing(name) PK!J molten/http/request.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from io import BytesIO from typing import BinaryIO, Dict, List, Optional, Union from ..typing import Environ from .headers import Headers from .query_params import ParamsDict, QueryParams class Request: """Represents an individual HTTP request. """ __slots__ = [ "method", "scheme", "host", "port", "path", "params", "headers", "body_file", ] def __init__( self, *, method: str = "GET", scheme: str = "", host: str = "", port: int = 0, path: str = "/", params: Optional[Union[ParamsDict, QueryParams]] = None, headers: Optional[Union[Dict[str, Union[str, List[str]]], Headers]] = None, body_file: Optional[BinaryIO] = None, ) -> None: self.method = method self.scheme = scheme self.host = host self.port = port self.path = path self.body_file = body_file or BytesIO() if isinstance(headers, dict): self.headers: Headers = Headers(headers) else: self.headers = headers or Headers() if not params: self.params = QueryParams() elif isinstance(params, dict): self.params = QueryParams(params) else: self.params = params @classmethod def from_environ(cls, environ: Environ) -> "Request": """Construct a Request object from a WSGI environ. """ return Request( method=environ["REQUEST_METHOD"], scheme=environ["wsgi.url_scheme"], host=environ.get("HTTP_HOST", ""), port=environ.get("SERVER_PORT", 0), path=environ.get("SCRIPT_NAME", "") + environ.get("PATH_INFO", ""), params=QueryParams.from_environ(environ), headers=Headers.from_environ(environ), body_file=environ["wsgi.input"], ) def __repr__(self) -> str: return ( f"Request(method={repr(self.method)}, scheme={repr(self.scheme)}, host={repr(self.host)}, " f"port={repr(self.port)}, path={repr(self.path)}, params={repr(self.params)}, " f"headers={repr(self.headers)}, body_file={repr(self.body_file)})" ) PK!?e[ [ molten/http/response.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import io import os from typing import BinaryIO, Optional, Union from .cookies import Cookie from .headers import Headers, HeadersDict class Response: """An HTTP response. """ __slots__ = [ "status", "headers", "stream", ] def __init__( self, status: str, headers: Optional[Union[HeadersDict, Headers]] = None, content: Optional[str] = None, stream: Optional[BinaryIO] = None, encoding: str = "utf-8", ) -> None: self.status = status if isinstance(headers, dict): self.headers = Headers(headers) else: self.headers = headers or Headers() if content is not None: self.stream: BinaryIO = io.BytesIO(content.encode(encoding)) elif stream is not None: self.stream: BinaryIO = stream else: self.stream: BinaryIO = io.BytesIO() def get_content_length(self) -> Optional[int]: """Compute the content length of this response. """ content_length = self.headers.get_int("content_length") if content_length is None: try: stream_stat = os.fstat(self.stream.fileno()) content_length = stream_stat.st_size except OSError: old_position = self.stream.tell() try: self.stream.seek(0, os.SEEK_END) content_length = self.stream.tell() finally: self.stream.seek(old_position, os.SEEK_SET) return content_length def set_cookie(self, cookie: Cookie) -> None: """Add a cookie to this response. """ self.headers.add("set-cookie", cookie.encode()) def __repr__(self) -> str: return f"Response(status={repr(self.status)}, headers={repr(self.headers)})" PK! molten/http/status_codes.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # 1xx HTTP_100 = "100 Continue" HTTP_101 = "101 Switching Protocols" HTTP_102 = "102 Processing" # 2xx HTTP_200 = "200 OK" HTTP_201 = "201 Created" HTTP_202 = "202 Accepted" HTTP_203 = "203 Non-Authoritative Information" HTTP_204 = "204 No Content" HTTP_205 = "205 Reset Content" HTTP_206 = "206 Partial Content" HTTP_207 = "207 Multi-Status" HTTP_208 = "208 Already Reported" # 3xx HTTP_300 = "300 Multiple Choices" HTTP_301 = "301 Moved Permanently" HTTP_302 = "302 Found" HTTP_303 = "303 See Other" HTTP_304 = "304 Not Modified" HTTP_305 = "305 Use Proxy" HTTP_307 = "307 Temporary Redirect" HTTP_308 = "308 Permanent Redirect" # 4xx HTTP_400 = "400 Bad Request" HTTP_401 = "401 Unauthorized" HTTP_402 = "402 Payment Required" HTTP_403 = "403 Forbidden" HTTP_404 = "404 Not Found" HTTP_405 = "405 Method Not Allowed" HTTP_406 = "406 Not Acceptable" HTTP_407 = "407 Proxy Authentication Required" HTTP_408 = "408 Request Timeout" HTTP_409 = "409 Conflict" HTTP_410 = "410 Gone" HTTP_411 = "411 Length Required" HTTP_412 = "412 Precondition Failed" HTTP_413 = "413 Payload Too Large" HTTP_414 = "414 Request-URI Too Long" HTTP_415 = "415 Unsupported Media Type" HTTP_416 = "416 Requested Range Not Satisfiable" HTTP_417 = "417 Expectation Failed" HTTP_418 = "418 I'm a teapot" HTTP_421 = "421 Misdirected Request" HTTP_422 = "422 Unprocessable Entity" HTTP_423 = "423 Locked" HTTP_424 = "424 Failed Dependency" HTTP_426 = "426 Upgrade Required" HTTP_428 = "428 Precondition Required" HTTP_429 = "429 Too Many Requests" HTTP_431 = "431 Request Header Fields Too Large" HTTP_444 = "444 Connection Closed Without Response" HTTP_451 = "451 Unavailable For Legal Reasons" HTTP_499 = "499 Client Closed Request" HTTP_415 = "415 Unsupported Media Type" # 5xx HTTP_500 = "500 Internal Server Error" HTTP_501 = "501 Not Implmeneted" HTTP_502 = "502 Bad Gateway" HTTP_503 = "503 Service Unavailable" HTTP_504 = "504 Gateway Timeout" HTTP_505 = "505 HTTP Version Not Supported" HTTP_506 = "506 Variant Also Negotiates" HTTP_507 = "507 Insufficient Storage" HTTP_508 = "508 Loop Detected" HTTP_510 = "510 Not Extended" HTTP_511 = "511 Network Authentication Required" HTTP_599 = "599 Network Connect Timeout Error" PK!!!molten/http/uploaded_file.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from shutil import copyfileobj from typing import Any, BinaryIO, Union from .headers import Headers class UploadedFile: """Represents a file that was uploaded as part of an HTTP request. May be backed by an in-memory file-like object or a real temporary file on disk. Attributes: filename: The name the file had in the request. headers: Headers sent with the file. stream: The file-like object containing the data. """ __slots__ = [ "filename", "headers", "stream", ] def __init__(self, filename: str, headers: Headers, stream: BinaryIO) -> None: self.filename = filename self.headers = headers self.stream = stream def save(self, destination: Union[str, BinaryIO]) -> None: """Save the file's contents either to another file object or to a path on disk. """ if isinstance(destination, str): with open(destination, "wb+") as outfile: copyfileobj(self.stream, outfile) else: copyfileobj(self.stream, destination) def __getattr__(self, name: str) -> Any: return getattr(self.stream, name) def __repr__(self) -> str: params = ", ".join(f"{name}={getattr(self, name)!r}" for name in self.__slots__) return f"UploadedFile({params})" PK!egmolten/middleware.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import Any, Callable, List from .errors import HeaderMissing, HTTPError, ParseError from .http import HTTP_200, HTTP_400, HTTP_406, Request, Response from .renderers import ResponseRenderer class ResponseRendererMiddleware: """A middleware that renders responses. """ def __init__(self, renderers: List[ResponseRenderer]) -> None: self.renderers = renderers def __call__(self, handler: Callable[..., Any]) -> Callable[..., Response]: def handle(request: Request) -> Response: try: response = handler() if isinstance(response, Response): return response if isinstance(response, tuple): status, response = response else: status, response = HTTP_200, response except ParseError as e: status, response = HTTP_400, {"error": str(e)} except HTTPError as e: status, response = e.status, e.response try: accept = request.headers["accept"] except HeaderMissing: accept = "*/*" for renderer in self.renderers: if accept == "*/*" or renderer.can_render_response(accept): return renderer.render(status, response) return Response(HTTP_406, content="Not Acceptable", headers={ "content-type": "text/plain", }) return handle PK!m}V(V(molten/parsers.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import json import os import re from tempfile import SpooledTemporaryFile from typing import Any, Dict, Iterator, List, Tuple, Union, no_type_check from urllib.parse import parse_qsl from typing_extensions import Protocol from .common import MultiDict from .errors import FieldTooLarge, FileTooLarge, ParseError, TooManyFields from .http import Headers, UploadedFile from .typing import Header, RequestBody, RequestInput class RequestParser(Protocol): # pragma: no cover """Protocol for request parsers. """ def can_parse_content(self, content_type: str) -> bool: ... @no_type_check def parse(self): ... class JSONParser: """A JSON request parser. """ def can_parse_content(self, content_type: str) -> bool: return content_type.startswith("application/json") def parse(self, data: RequestBody) -> Any: try: return json.loads(data) except json.JSONDecodeError: raise ParseError("JSON input could not be parsed") class URLEncodingParser: """A parser for urlencoded requests. """ def can_parse_content(self, content_type: str) -> bool: return content_type.startswith("application/x-www-form-urlencoded") def parse(self, data: RequestBody) -> MultiDict[str, str]: try: return MultiDict(parse_qsl(data.decode("utf-8"), strict_parsing=True)) except ValueError: raise ParseError("failed to parse urlencoded data") class MultiPartParser: """A parser for multipart requests. Returns a MultiDict mapping field names to lists of field string values or UploadedFiles. This is a reasonably simple streaming parser implementation for the multipart/form-data media type. As such, it does not support deprecated parts of RFC7578 like multipart/mixed content and content-transfer-encoding headers. Parameters: bufsize: The max size of the streaming data buffer. This should be a 32 bit integer that's a multiple of 4. In some cases, the streaming data buffer may contain double this amount so take that into account when choosing a value. Additionally, the value should be greater than the longest individual header value you want to accept. encoding: The codec to use when decoding form field values. encoding_errors: What to do when an decoding error is encountered. max_field_size: The max number of bytes a field can contain. max_file_size: The max number of bytes a file can contain. max_num_fields: The max number of fields accepted per request. max_spooled_size: The max number of bytes a file in the request can have before it's written to a temporary file on disk. """ __slots__ = [ "bufsize", "encoding", "encoding_errors", "max_field_size", "max_file_size", "max_num_fields", "max_spooled_size", ] BOUNDARY_RE = re.compile("boundary=(.+)") PARAMS_RE = re.compile('([A-Za-z]+)="([^"]+)"') def __init__( self, *, bufsize: int = 64 * 1024, encoding: str = "utf-8", encoding_errors: str = "replace", max_field_size: int = 500 * 1024, max_file_size: int = 10 * 1024 * 1024, max_num_fields: int = 100, max_spooled_size: int = 1024 * 1024, ) -> None: self.bufsize = bufsize self.encoding = encoding self.encoding_errors = encoding_errors self.max_field_size = max_field_size self.max_file_size = max_file_size self.max_num_fields = max_num_fields self.max_spooled_size = max_spooled_size def can_parse_content(self, content_type: str) -> bool: return content_type.startswith("multipart/form-data") def parse(self, content_type: Header, content_length: Header, body_file: RequestInput) -> MultiDict[str, Union[str, UploadedFile]]: # noqa matches = self.BOUNDARY_RE.search(content_type) if not matches: raise ParseError("boundary missing from content-type header") boundary = matches.group(1) lines = self._iter_lines(body_file, boundary, int(content_length)) parts = self._iter_parts(lines, boundary) return MultiDict(parts) def _iter_lines(self, stream: RequestInput, boundary: str, limit: int) -> Iterator[bytes]: buff = b"" remaining = limit while remaining > 0: data = stream.read(self.bufsize) remaining -= len(data) if not data: return buff += data if remaining > 0 and len(buff) < self.bufsize: continue while buff: try: i = buff.index(b"\r\n") except ValueError: break line, buff = buff[:i + 2], buff[i + 2:] yield line if len(buff) >= self.bufsize and not buff.endswith(b"\r"): yield buff buff = b"" def _iter_parts(self, lines: Iterator[bytes], boundary: str) -> Iterator[Tuple[str, Union[str, UploadedFile]]]: next_part = f"--{boundary}\r\n".encode() last_part = f"--{boundary}--\r\n".encode() def prepare_current_part() -> Tuple[str, Union[str, UploadedFile]]: nonlocal total_field_count headers = Headers(current_part_headers) name = current_part_disposition["name"] value: Union[str, UploadedFile] if "filename" in current_part_disposition: # Strip CRLF from the end of the file and then rewind. current_part_container.seek(-2, os.SEEK_END) current_part_container.truncate() headers.add("content-length", str(current_part_container.tell())) current_part_container.seek(0) filename = current_part_disposition["filename"] value = UploadedFile(filename, headers, current_part_container) else: # Strip CRLF from the end of the buffer. data = current_part_container[:-2] value = data.decode(self.encoding, errors=self.encoding_errors) total_field_count += 1 return name, value def append_bytes(data: bytes) -> None: nonlocal current_part_container current_part_container += data total_field_count = 1 current_part_bytes: int = 0 current_part_is_file: bool = False current_part_container: Any = None current_part_writer: Any = None current_part_headers: Dict[str, Union[str, List[str]]] = {} current_part_disposition: Dict[str, str] = {} current_part_past_headers: bool = False for line in lines: if total_field_count > self.max_num_fields: raise TooManyFields("the input contains too many fields") if line == last_part: if current_part_container is not None: yield prepare_current_part() break elif line == next_part: if current_part_container is not None: yield prepare_current_part() current_part_bytes = 0 current_part_is_file = False current_part_container = None current_part_writer = None current_part_headers = {} current_part_disposition = {} current_part_past_headers = False elif not current_part_past_headers: line = line.rstrip() if not line: if current_part_container is None: raise ParseError("content-disposition header is missing") current_part_past_headers = True continue header_name, _, header_value = line.decode().partition(": ") current_part_headers[header_name] = header_value if header_name.lower() == "content-disposition": current_part_disposition = dict(self.PARAMS_RE.findall(header_value)) if "name" not in current_part_disposition: raise ParseError("content-disposition header without a name") if "filename" in current_part_disposition: current_part_is_file = True current_part_container = SpooledTemporaryFile(mode="wb+", max_size=self.max_spooled_size) current_part_writer = current_part_container.write else: current_part_is_file = False current_part_container = b"" current_part_writer = append_bytes else: current_part_bytes += len(line) if current_part_is_file and current_part_bytes >= self.max_file_size: message = f"file '{current_part_disposition['name']}' exceeds the file size limit" raise FileTooLarge(message) elif not current_part_is_file and current_part_bytes >= self.max_field_size: message = f"field '{current_part_disposition['name']}' exceeds the field size limit" raise FieldTooLarge(message) current_part_writer(line) else: raise ParseError("unexpected end of input") PK!qU4molten/py.typed# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # This package uses inline types as per PEP561.PK!Ftvvmolten/renderers.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import json from typing import Any from typing_extensions import Protocol from .http import Response from .validation import dump_schema, is_schema class ResponseRenderer(Protocol): # pragma: no cover """Protocol for response renderers. """ def can_render_response(self, accept: str) -> bool: ... def render(self, status: str, response_data: Any) -> Response: ... class JSONRenderer: """A JSON response renderer. """ def can_render_response(self, accept: str) -> bool: return accept.startswith("application/json") def render(self, status: str, response_data: Any) -> Response: content = json.dumps(response_data, default=self.default) return Response(status, content=content, headers={ "content-type": "application/json; charset=utf-8", }) def default(self, ob: Any) -> Any: """You may override this when subclassing the JSON renderer in order to encode non-standard object types. """ if is_schema(type(ob)): return dump_schema(ob) raise TypeError(f"cannot encode values of type {type(ob)}") # pragma: no cover PK!%molten/router.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import re from collections import defaultdict from typing import Any, Callable, Dict, Iterator, List, Optional, Pattern, Tuple, Union from .errors import RouteNotFound, RouteParamMissing #: Alias for things that can be added to a router. RouteLike = Union["Route", "Include"] class Route: """An individual route. """ __slots__ = [ "template", "handler", "method", "name", ] def __init__(self, template: str, handler: Callable[..., Any], method: str = "GET", name: Optional[str] = None) -> None: self.template = template self.handler = handler self.method = method self.name = name or handler.__name__ class Include: """Groups of routes prefixed by a common path. """ __slots__ = [ "prefix", "routes", ] def __init__(self, prefix: str, routes: List[RouteLike]) -> None: self.prefix = prefix self.routes = routes class Router: """A collection of routes. """ __slots__ = [ "_routes_by_name", "_routes_by_method", "_route_res_by_method", ] def __init__(self, routes: Optional[List[RouteLike]] = None) -> None: self._routes_by_name: Dict[str, Route] = {} self._routes_by_method: Dict[str, List[Route]] = defaultdict(list) self._route_res_by_method: Dict[str, List[Pattern[str]]] = defaultdict(list) self.add_routes(routes or []) def add_route(self, route_like: RouteLike, prefix: str = "") -> None: """Add a Route to this instance. """ if isinstance(route_like, Include): self.add_routes(route_like.routes, prefix + route_like.prefix) elif isinstance(route_like, Route): if route_like.name in self._routes_by_name: raise ValueError(f"a route named {route_like.name} is already registered") route = Route( template=prefix + route_like.template, handler=route_like.handler, method=route_like.method, name=route_like.name, ) self._routes_by_name[route.name] = route self._routes_by_method[route.method].insert(0, route) self._route_res_by_method[route.method].insert(0, compile_route_template(route.template)) else: # pragma: no cover raise NotImplementedError(f"unhandled type {type(route_like)}") def add_routes(self, route_likes: List[RouteLike], prefix: str = "") -> None: """Add a set of routes to this instance. """ for route_like in route_likes: self.add_route(route_like, prefix) def match(self, method: str, path: str) -> Union[None, Tuple[Route, Dict[str, str]]]: """Look up the route matching the given method and path. Returns the route and any path params. """ routes = self._routes_by_method[method] route_res = self._route_res_by_method[method] for route, route_re in zip(routes, route_res): match = route_re.match(path) if match is not None: return route, match.groupdict() return None def reverse_uri(self, route_name: str, **params: str) -> str: """Build a URI from a Route. Raises: RouteNotFound: When the route doesn't exist. RouteParamMissing: When a required parameter was not provided. """ try: route = self._routes_by_name[route_name] except KeyError: raise RouteNotFound(route_name) uri = [] for kind, token in tokenize_route_template(route.template): if kind == "binding" or kind == "glob": try: uri.append(str(params[token])) except KeyError: raise RouteParamMissing(token) elif kind == "chunk": uri.append(token) return "".join(uri) def compile_route_template(template: str) -> Pattern[str]: """Convert a route template into a regular expression. """ re_template = "" for kind, token in tokenize_route_template(template): if kind == "binding": re_template += f"(?P<{token}>[^/]+)" elif kind == "glob": re_template += f"(?P<{token}>.+)" elif kind == "chunk": re_template += token.replace(".", r"\.") else: # pragma: no cover raise NotImplementedError(f"unhandled token kind {kind!r}") return re.compile(f"^{re_template}$") def tokenize_route_template(template: str) -> Iterator[Tuple[str, str]]: """Convert a route template into a stream of tokens. """ k, i = 0, 0 while i < len(template): if template[i] == "{": yield "chunk", template[k:i] k = i kind = "binding" if template[i:i + 2] == "{*": kind = "glob" i += 1 for j in range(i + 1, len(template)): if template[j] == "}": yield kind, template[i + 1:j] k = j + 1 i = j break else: raise SyntaxError(f"unmatched {{ in route template {template!r}") i += 1 if k != i: yield "chunk", template[k:i] PK!* GGmolten/testing/__init__.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from .client import TestClient from .common import to_environ __all__ = ["TestClient", "to_environ"] PK!RCCmolten/testing/client.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . import json from functools import partial from io import BytesIO from json import dumps as to_json from typing import Any, Callable, Dict, Optional, Union from urllib.parse import urlencode from ..app import BaseApp from ..http import HTTP_200 from ..http.headers import Headers, HeadersDict from ..http.query_params import ParamsDict, QueryParams from ..http.request import Request from ..http.response import Response from .common import to_environ HTTP_METHODS = {"delete", "head", "get", "patch", "post", "put"} class TestResponse: """A wrapper around Response objects that adds a few additional helper methods for testing. """ __slots__ = ["_response"] def __init__(self, response: Response) -> None: self._response = response @property def data(self) -> str: """Rewinds the output stream and returns all its data. """ self._response.stream.seek(0) return self._response.stream.read().decode("utf-8") @property def status_code(self) -> int: """Returns the HTTP status code as an integer. """ code, _, _ = self._response.status.partition(" ") return int(code) def json(self) -> Any: """Convert the response data to JSON. """ return json.loads(self.data) def __getattr__(self, name: str) -> Any: return getattr(self._response, name) class TestClient: """Test clients are used to simulate requests against an application instance. """ __slots__ = ["app"] def __init__(self, app: BaseApp) -> None: self.app = app def request( self, method: str, path: str, headers: Optional[Union[HeadersDict, Headers]] = None, params: Optional[Union[ParamsDict, QueryParams]] = None, body: Optional[bytes] = None, data: Optional[Dict[str, str]] = None, json: Optional[Any] = None, auth: Optional[Callable[[Request], Request]] = None, ) -> TestResponse: """Simulate a request against the application. Raises: RuntimeError: If both 'data' and 'json' are provided. """ if data is not None and json is not None: raise RuntimeError("either 'data' or 'json' should be provided, not both") request = Request( method=method.upper(), path=path, headers=headers, params=params, ) if body is not None: request.headers["content-length"] = f"{len(body)}" request.body_file = BytesIO(body) if data is not None: request_content = urlencode(data).encode("utf-8") request.headers["content-type"] = "application/x-www-form-urlencoded" request.headers["content-length"] = f"{len(request_content)}" request.body_file = BytesIO(request_content) elif json is not None: request_content = to_json(json).encode("utf-8") request.headers["content-type"] = "application/json; charset=utf-8" request.headers["content-length"] = f"{len(request_content)}" request.body_file = BytesIO(request_content) if auth is not None: request = auth(request) response = Response(HTTP_200) def start_response(status, response_headers, exc_info=None): # type: ignore nonlocal response response.status = status response.headers = Headers(dict(response_headers)) chunks = self.app(to_environ(request), start_response) for chunk in chunks: response.stream.write(chunk) response.stream.seek(0) return TestResponse(response) def __getattr__(self, name: str) -> Any: if name in HTTP_METHODS: return partial(self.request, name) raise AttributeError(f"unknown attribute {name}") PK!جj;molten/testing/common.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import Union from urllib.parse import urlencode from ..http import Headers, QueryParams, Request from ..typing import Environ def to_environ(ob: Union[Request, Headers, QueryParams]) -> Environ: """Convert request abstractions to WSGI environ dicts. """ if isinstance(ob, Request): return { "HTTP_HOST": ob.host, "PATH_INFO": ob.path, "REQUEST_METHOD": ob.method, "SERVER_PORT": ob.port, "wsgi.input": ob.body_file, "wsgi.url_scheme": ob.scheme, **to_environ(ob.params), **to_environ(ob.headers), } elif isinstance(ob, Headers): headers = {f"HTTP_{name.upper().replace('-', '_')}": value for name, value in ob} try: headers["CONTENT_TYPE"] = headers.pop("HTTP_CONTENT_TYPE") except KeyError: pass try: headers["CONTENT_LENGTH"] = headers.pop("HTTP_CONTENT_LENGTH") except KeyError: pass return headers elif isinstance(ob, QueryParams): return {"QUERY_STRING": urlencode(list(ob))} else: # pragma: no cover raise NotImplementedError(f"to_environ cannot handle type {type(ob)}") PK!pO O molten/typing.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import Any, BinaryIO, Callable, Dict, List, NewType, Tuple, Union #: An alias representing a WSGI environment dictionary. Environ = Dict[str, Any] #: An alias representing a WSGI start_response callback. StartResponse = Callable[[str, List[Tuple[str, str]], Any], None] #: The type of middleware functions. Middleware = Callable[[Callable[..., Any]], Callable[..., Any]] #: The request method. Method = NewType("Method", str) #: The request URI scheme. Scheme = NewType("Scheme", str) #: The request hostname. Host = NewType("Host", str) #: The request port. Port = NewType("Port", int) #: The request query string. QueryString = NewType("QueryString", str) #: A query string parameter. QueryParam = NewType("QueryParam", str) #: A header. Header = NewType("Header", str) #: A file-like object representing the request input data. RequestInput = NewType("RequestInput", BinaryIO) #: A bytestring representing the request input data. RequestBody = NewType("RequestBody", bytes) #: Parsed request data. RequestData = NewType("RequestData", Dict[str, Any]) def extract_optional_annotation(annotation: Any) -> Tuple[bool, Any]: """Returns a tuple denoting whether or not the annotation is an Optional type and the inner annotation. """ if is_optional_annotation(annotation): return True, annotation.__args__[0] return False, annotation def is_optional_annotation(annotation: Any) -> bool: """Returns True if the given annotation represents an Optional type. """ try: return getattr(annotation, "__origin__", None) is Union and \ issubclass(annotation.__args__[1], type(None)) except TypeError: # pragma: no cover return False def is_generic_annotation(annotation: Any) -> bool: """Returns True if the given annotation represents a Generic type. """ return hasattr(annotation, "__origin__") PK!gsmolten/validation/__init__.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from .common import Missing, is_schema from .field import Field, Validator from .schema import dump_schema, load_schema, schema __all__ = [ "Missing", "Field", "Validator", "schema", "dump_schema", "load_schema", "is_schema", ] PK!\Gmolten/validation/common.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import Any, Type class _Missing: """The type of missing values. """ def __repr__(self) -> str: # pragma: no cover return "Missing" #: Canary value representing missing attributes or values. Missing = _Missing() def is_schema(ob: Type[Any]) -> bool: """Returns True if the given type is a schema. """ return isinstance(ob, type) and \ hasattr(ob, "_SCHEMA") and \ hasattr(ob, "_FIELDS") PK!&a#9#9molten/validation/field.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import ( Any, Callable, Dict, Generic, List, Optional, Sequence, Type, TypeVar, Union, no_type_check ) from typing_extensions import Protocol from ..errors import FieldValidationError, ValidationError from ..typing import extract_optional_annotation, is_generic_annotation from .common import Missing, _Missing, is_schema _T = TypeVar("_T") class Validator(Protocol[_T]): # pragma: no cover """Validators ensure that values conform to arbitrary specifications. """ def can_validate_field(self, field: "Field[_T]") -> bool: """This should return True if this validator can validate the given Field. """ ... @no_type_check def validate(self, field: "Field[_T]", value: Any, **options: Any) -> _T: """Validate and possibly transform the given value. Raises: FieldValidationError: If the value is not valid. """ ... class Field(Generic[_T]): """An individual field on a schema. """ __slots__ = [ "name", "annotation", "description", "default", "default_factory", "request_name", "response_name", "request_only", "response_only", "allow_coerce", "validator", "validator_options", ] def __init__( self, name: Optional[str] = None, annotation: Optional[Type[_T]] = None, description: Optional[str] = None, default: Union[_T, _Missing] = Missing, default_factory: Optional[Callable[[], _T]] = None, request_name: Optional[str] = None, response_name: Optional[str] = None, request_only: bool = False, response_only: bool = False, allow_coerce: bool = False, validator: Optional[Validator[_T]] = None, **validator_options: Any, ) -> None: self.name = name self.annotation = annotation self.description = description self.default = default self.default_factory = default_factory self.request_name = request_name or name self.response_name = response_name or name self.request_only = request_only self.response_only = response_only self.allow_coerce = allow_coerce self.validator = validator self.validator_options = validator_options def select_validator(self) -> None: """Find a suitable Validator for this field. """ self.validator = _select_validator(self) if self.validator_options and not self.validator: raise RuntimeError(f"no validator could be selected for field {self}") @property def has_default(self) -> bool: """Returns True if the field has either a default value or a default factory. """ return self.default is not Missing or self.default_factory is not None @no_type_check def validate(self, value: Optional[Any]) -> _T: """Validate and possibly transform the given value. Raises: FieldValidationError: When the value is not valid. """ is_optional, annotation = extract_optional_annotation(self.annotation) # Distinguishing between missing values and null values is # important. Optional types can have None as a value whereas # types with a default cannot. Additionally, it's possible to # have an optional type without a default value. if value is Missing: if self.default is not Missing: return self.default elif self.default_factory: return self.default_factory() elif is_optional: return None raise FieldValidationError("this field is required") if value is None: if not is_optional: raise FieldValidationError("this field cannot be null") return value if not is_generic_annotation(annotation) and not isinstance(value, annotation) and not is_schema(annotation): if not self.allow_coerce: raise FieldValidationError(f"unexpected type {type(value).__name__}") try: value = annotation(value) except Exception as e: raise FieldValidationError(f"value could not be coerced to {annotation.__name__}") if self.validator: return self.validator.validate(self, value, **self.validator_options) return value def __repr__(self) -> str: params = ", ".join(f"{name}={repr(getattr(self, name))}" for name in self.__slots__) return f"{type(self).__name__}({params})" class NumberValidator: """Validates numbers. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return annotation is int or annotation is float def validate( self, field: Field[_T], value: Union[int, float], minimum: Optional[Union[int, float]] = None, maximum: Optional[Union[int, float]] = None, multiple_of: Optional[Union[int, float]] = None, ) -> Union[int, float]: if minimum is not None and value < minimum: raise FieldValidationError(f"value must be >= {minimum}") if maximum is not None and value > maximum: raise FieldValidationError(f"value must be <= {maximum}") if multiple_of is not None and value % multiple_of != 0: raise FieldValidationError(f"value must be a multiple of {multiple_of}") return value class StringValidator: """Validates strings. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return annotation is str def validate( self, field: Field[_T], value: str, choices: Optional[Sequence[str]] = None, min_length: Optional[int] = None, max_length: Optional[int] = None, ) -> str: if choices is not None and value not in choices: raise FieldValidationError(f"must be one of: {', '.join(repr(choice) for choice in choices)}") if min_length is not None and len(value) < min_length: raise FieldValidationError(f"length must be >= {min_length}") if max_length is not None and len(value) > max_length: raise FieldValidationError(f"length must be <= {max_length}") return value class ListValidator: """Validates lists. When a generic parameter is provided, then the values will be validated against that annotation:: @schema class Setting: name: str value: str @schema class Account: settings: List[Setting] >>> load_schema(Account, {"settings": [{"name": "a", "value": "b"}]}) Account(settings=[Setting(name="a", value="b")]) >>> load_schema(Account, {"settings": [{"name": "a"}]}) ValidationError({"settings": {0: {"value": "this field is required"}}}) When a generic parameter isn't provided, then any list is accepted. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return getattr(annotation, "__origin__", None) is List @no_type_check def validate( self, field: Field[_T], value: List[Any], min_items: Optional[int] = None, max_items: Optional[int] = None, item_validator_options: Optional[Dict[str, Any]] = None, ) -> List[Any]: if not isinstance(value, list): raise FieldValidationError("value must be a list") if min_items is not None and len(value) < min_items: raise FieldValidationError(f"length must be >= {min_items}") if max_items is not None and len(value) > max_items: raise FieldValidationError(f"length must be <= {max_items}") # If there are no args, then the list can contain anything, # otherwise each item needs to be validated. annotation_args = getattr(field.annotation, "__args__", []) if annotation_args: # This is a little piggy but it works well enough in practice. item_validator_options = item_validator_options or {} sub_field = Field(annotation=annotation_args[0], **item_validator_options) sub_field.select_validator() items = [] for i, item in enumerate(value): try: items.append(sub_field.validate(item)) except FieldValidationError as e: raise ValidationError({i: str(e)}) except ValidationError as e: raise ValidationError({i: e.reasons}) return items return value class DictValidator: """Validates dictionaries. When the ``fields`` option is provided, only the declared fields are going to be extracted from the input and will be validated. @schema class Account: settings: Dict[str, str] = Field(fields={ "a": Field(annotation=str), }) >>> load_schema(Account, {"settings": {}}) Account(settings={}) >>> load_schema(Account, {"settings": {"a": "b", "c": "d"}}) Account(settings={"settings" {"a": "b"}}) >>> load_schema(Account, {"settings": {"a": 42}}) ValidationError({"settings": {"a": "unexpected type int"}}) When the ``fields`` option is not provided and the annotation has generic parameters, then the items from the input will be validated against the generic parameter annotations:: @schema class Account: settings: Dict[str, str] >>> load_schema(Account, {"settings": {}}) Account(settings={}) >>> load_schema(Account, {"settings": {"a": "b"}}) Account(settings={"a": "b"}) >>> load_schema(Account, {"settings": {"a": 42}) # invalid ValidationError({"settings": {"a": "unexpected type int"}}) When neither ``fields`` or generic parameters are provided, then any dictionary will be accepted. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return getattr(annotation, "__origin__", None) is Dict @no_type_check def validate( self, field: Field[_T], value: Dict[Any, Any], fields: Optional[Dict[str, Field[Any]]] = None, key_validator_options: Optional[Dict[str, Any]] = None, value_validator_options: Optional[Dict[str, Any]] = None, ) -> Dict[Any, Any]: if not isinstance(value, dict): raise FieldValidationError("value must be a dict") # If a field dictionary was provided then we select specific # items from the input, otherwise we just validate the input. if fields is not None: items = {} for item_name, item_field in fields.items(): try: item_field.select_validator() item_value = value.get(item_name, Missing) items[item_name] = item_field.validate(item_value) except FieldValidationError as e: raise ValidationError({item_name: str(e)}) except ValidationError as e: raise ValidationError({item_name: e.reasons}) return items # If there are no args, then the dict can contain anything, # otherwise each item needs to be validated. annotation_args = getattr(field.annotation, "__args__", []) if annotation_args and len(annotation_args) == 2: key_validator_options = key_validator_options or {} key_field = Field(annotation=annotation_args[0], **key_validator_options) key_field.select_validator() value_validator_options = value_validator_options or {} value_field = Field(annotation=annotation_args[1], **value_validator_options) value_field.select_validator() items = {} for item_name, item_value in value.items(): try: item_name = key_field.validate(item_name) item_value = value_field.validate(item_value) items[item_name] = item_value except FieldValidationError as e: raise ValidationError({item_name: str(e)}) except ValidationError as e: raise ValidationError({item_name: e.reasons}) return items return value class SchemaValidator: """Validates dictionaries against schema classes. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return is_schema(annotation) def validate(self, field: Field[_T], value: Dict[str, Any]) -> Any: from .schema import load_schema _, annotation = extract_optional_annotation(field.annotation) return load_schema(annotation, value) #: The set of built-in validators. Fields will attempt to use one of #: these unless otherwise specified. VALIDATORS: List[Validator[Any]] = [ NumberValidator(), StringValidator(), ListValidator(), DictValidator(), SchemaValidator(), ] def _select_validator(field: Field[_T]) -> Optional[Validator[_T]]: """Find a suitable validator for the given Field. """ for validator in VALIDATORS: if validator.can_validate_field(field): return validator return None PK!|molten/validation/schema.py# This file is a part of molten. # # Copyright (C) 2018 CLEARTYPE SRL # # molten is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # molten is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . from typing import Any, Dict, List, Optional, Type, TypeVar, get_type_hints, no_type_check from ..errors import FieldValidationError, ValidationError from .common import Missing, is_schema from .field import Field _T = TypeVar("_T") def schema(cls: Type[_T]) -> Type[_T]: """Construct a schema from a class. Raises: RuntimeError: when the attributes are invalid. """ fields = {} annotations = get_type_hints(cls) found_default = False for name, annotation in annotations.items(): value = getattr(cls, name, Missing) if isinstance(value, Field): value.name = name value.annotation = annotation value.request_name = value.request_name or name value.response_name = value.response_name or name fields[name] = value else: fields[name] = Field(name=name, annotation=annotation, default=value) # At this point the field instance has an annotation for sure # so it's safe to select a Validator. field = fields[name] field.select_validator() # Make sure fields without a default don't come after fields # with one. if field.has_default: found_default = True elif found_default: raise RuntimeError("attributes without a default cannot follow ones with a default") # Remove the attribute from the class definition. if value is not Missing: delattr(cls, name) if not fields: raise RuntimeError(f"schema {cls.__name__} doesn't have any fields") setattr(cls, "__slots__", list(fields)) setattr(cls, "_SCHEMA", True) setattr(cls, "_FIELDS", fields) _add_init(cls, fields) _add_fn(cls, "__eq__", ["self", "other"], _EQ_FN_BODY) _add_fn(cls, "__repr__", ["self"], _REPR_FN_BODY) return cls @no_type_check def load_schema(schema: Type[_T], data: Dict[str, Any]) -> _T: """Validate the given data dictionary against a schema and instantiate the schema. Raises: ValidationError: When the input data is not valid. Parameters: schema: The schema class to validate the data against. data: Data to validate against and populate the schema with. """ if not is_schema(schema): raise TypeError(f"{schema} is not a schema") errors, params = {}, {} for field in schema._FIELDS.values(): if field.response_only: continue try: value = data.get(field.request_name, Missing) params[field.name] = field.validate(value) except FieldValidationError as e: errors[field.request_name] = str(e) except ValidationError as e: errors[field.request_name] = e.reasons if errors: raise ValidationError(errors) return schema(**params) @no_type_check def dump_schema(ob: Any) -> Dict[str, Any]: """Convert a schema instance into a dictionary. Raises: TypeError: If ob is not a schema instance. Parameters: ob: An instance of a schema. """ if not is_schema(type(ob)): raise TypeError(f"{ob} is not a schema") data = {} for field in ob._FIELDS.values(): if field.request_only: continue value = getattr(ob, field.name) if is_schema(type(value)): value = dump_schema(value) elif isinstance(value, list): value = [dump_schema(item) if is_schema(type(item)) else item for item in value] elif isinstance(value, dict): value = {name: dump_schema(item) if is_schema(type(item)) else item for name, item in value.items()} data[field.response_name] = value return data def _add_fn( cls: Type[Any], name: str, params: List[str], body: List[str], fn_globals: Optional[Dict[str, Any]] = None, fn_locals: Optional[Dict[str, Any]] = None, ) -> None: """Construct a function and add it to a class. """ if name in cls.__dict__: return fn_globals = {"Missing": Missing, **(fn_globals or {})} fn_locals = fn_locals or {} definition = _FN_TEMPLATE.format( name=name, params=", ".join(params), body="\n ".join(body), ) exec(definition, fn_globals, fn_locals) setattr(cls, name, fn_locals[name]) def _add_init(cls: Type[Any], fields: Dict[str, Field[_T]]) -> None: """Construct and add an init function to a schema. """ fn_globals: Dict[str, Any] = {} fn_params = ["self"] fn_body = [] for field in fields.values(): if field.default is not Missing: default_name = f"_{field.name}_default" fn_globals[default_name] = field.default fn_params.append(f"{field.name}=Missing") fn_body.append(f"self.{field.name} = {field.name} if {field.name} is not Missing else {default_name}") elif field.default_factory: factory_name = f"_{field.name}_default_factory" fn_globals[factory_name] = field.default_factory fn_params.append(f"{field.name}=Missing") fn_body.append(f"self.{field.name} = {field.name} if {field.name} is not Missing else {factory_name}()") else: fn_params.append(f"{field.name}") fn_body.append(f"self.{field.name} = {field.name}") _add_fn(cls, "__init__", fn_params, fn_body, fn_globals) _FN_TEMPLATE = """\ def {name}({params}): {body} """.rstrip() _EQ_FN_BODY = """\ try: return all(getattr(self, name) == getattr(other, name) for name in self._FIELDS) except AttributeError: return False """.rstrip().split("\n") _REPR_FN_BODY = """\ params = ', '.join(f'{name}={repr(getattr(self, name))}' for name in self._FIELDS) return f'{type(self).__name__}({params})' """.rstrip().split("\n") PK!HƣxSTmolten-0.0.1.dist-info/WHEEL A н#J;/"d&F]xzw>@Zpy3F ]n2H%_60{8&baPa>PK!Hmolten-0.0.1.dist-info/METADATAPN1 +|A"EQDYV m!"%ɪݿǥ@Ͻٞ{ |#5>Q!z^V?ZB g~p 4ߙ$iX 7KGБQWoe;zO.T%l[KIѰ=oӳ%WCj|r梁` g9c PK!HM molten-0.0.1.dist-info/RECORDuGyf@&?2:2SCՔ#{^VgjhH}N76uN-.JA!™)m_λ0r۞ jIlWvHt"T¦Ņ=s%TGhTʻ`f.#8M qxi5s)Eڵ ][e8d݃oQ|cgcV'dpr͢d 0Ǻ/ܐy6}Fd1OrU[#x5wO+fAn$u+5sPtk}+HdI!"'eGxA+-,+B2 OH}\9""|ɧ/^pBN-E{k`8M> WG;D3wTRw?ocݼH-}ҍ^(VU OwD7iK(8@nvgS3N~/{3cLA%I2W3$6ۜ,ع_/^Idn>Va:vZzɫ>cO{˶/?q腌δKմ.cSݖJ ;LBd hx aP!LA忠l_T3Iܾ5<[ ">pZ3~F l(B~lᬎWIO_.s :*)4?%N*h״ }]_*ڜןɉD<9*2h5orL o%/)@?PK!c,66molten/__init__.pyPK!L fmolten/app.pyPK!.> d%molten/common.pyPK!3D jjq1molten/components.pyPK!fP Jmolten/contrib/settings.pyPK!O[bXmolten/contrib/templates.pyPK!Űu`molten/dependency_injection.pyPK!r;{ { _wmolten/errors.pyPK!fժByymolten/http/__init__.pyPK!vmolten/http/cookies.pyPK!){ěmolten/http/headers.pyPK!;"  ۬molten/http/query_params.pyPK!J molten/http/request.pyPK!?e[ [ 0molten/http/response.pyPK! molten/http/status_codes.pyPK!!!zmolten/http/uploaded_file.pyPK!egmolten/middleware.pyPK!m}V(V(molten/parsers.pyPK!qU4Tmolten/py.typedPK!Ftvvmolten/renderers.pyPK!%8molten/router.pyPK!* GG<4molten/testing/__init__.pyPK!RCC7molten/testing/client.pyPK!جj;4Jmolten/testing/common.pyPK!pO O +Rmolten/typing.pyPK!gs\molten/validation/__init__.pyPK!\G`molten/validation/common.pyPK!&a#9#9emolten/validation/field.pyPK!|molten/validation/schema.pyPK!HƣxSTmolten-0.0.1.dist-info/WHEELPK!Hmolten-0.0.1.dist-info/METADATAPK!HM molten-0.0.1.dist-info/RECORDPK