PK ! y_C C rororo/__init__.py"""
======
rororo
======
Collection of utilities, helpers, and principles for building Python backend
applications. Supports `aiohttp.web `_,
`Flask `_, and your web-framework.
"""
__author__ = 'Igor Davydenko'
__license__ = 'BSD-3-Clause'
__version__ = '1.2.0'
PK ! dۤ
rororo/aio.py"""
==========
rororo.aio
==========
Various utilities for aiohttp and other aio-libs.
"""
from contextlib import contextmanager
from typing import ( # noqa: F401
Any,
Callable,
Dict,
Iterator,
Optional,
TYPE_CHECKING,
Union,
)
from urllib.parse import urlparse
# Hack to load ``aiohttp.web`` only on mypy run
if TYPE_CHECKING: # pragma: no cover
from aiohttp import web
else:
web = type('FakeModule', (object, ), {
'AbstractRouter': Any,
'Request': Any,
'Resource': Any,
'Response': Any,
})()
__all__ = ('add_resource_context', 'is_xhr_request', 'parse_aioredis_url')
View = Callable[[web.Request], web.Response]
@contextmanager
def add_resource_context(router: web.AbstractRouter,
url_prefix: str=None,
name_prefix: str=None) -> Iterator[Any]:
"""Context manager for adding resources for given router.
Main goal of context manager to easify process of adding resources with
routes to the router. This also allow to reduce amount of repeats, when
supplying new resources by reusing URL & name prefixes for all routes
inside context manager.
Behind the scene, context manager returns a function which calls::
resource = router.add_resource(url, name)
resource.add_route(method, handler)
**Usage**::
with add_resource_context(app.router, '/api', 'api') as add_resource:
add_resource('/', get=views.index)
add_resource('/news', get=views.list_news, post=views.create_news)
:param router: Route to add resources to.
:param url_prefix: If supplied prepend this prefix to each resource URL.
:param name_prefix: If supplied prepend this prefix to each resource name.
"""
def add_resource(url: str,
get: View=None,
*,
name: str=None,
**kwargs: Any) -> web.Resource:
"""Inner function to create resource and add necessary routes to it.
Support adding routes of all methods, supported by aiohttp, as
GET/POST/PUT/PATCH/DELETE/HEAD/OPTIONS/*, e.g.,
::
with add_resource_context(app.router) as add_resource:
add_resource('/', get=views.get, post=views.post)
add_resource('/wildcard', **{'*': views.wildcard})
:param url:
Resource URL. If ``url_prefix`` setup in context it will be
prepended to URL with ``/``.
:param get:
GET handler. Only handler to be setup without explicit call.
:param name: Resource name.
:type name: str
:rtype: aiohttp.web.Resource
"""
kwargs['get'] = get
if url_prefix:
url = '/'.join((url_prefix.rstrip('/'), url.lstrip('/')))
if not name and get:
name = get.__name__
if name_prefix and name:
name = '.'.join((name_prefix.rstrip('.'), name.lstrip('.')))
resource = router.add_resource(url, name=name)
for method, handler in kwargs.items():
if handler is None:
continue
resource.add_route(method.upper(), handler)
return resource
yield add_resource
def is_xhr_request(request: web.Request) -> bool:
"""Check whether current request is XHR one or not.
Basically it just checks that request contains ``X-Requested-With`` header
and that the header equals to ``XMLHttpRequest``.
:param request: Request instance.
"""
return request.headers.get('X-Requested-With') == 'XMLHttpRequest'
def parse_aioredis_url(url: str) -> Dict[str, Any]:
"""
Convert Redis URL string to dict suitable to pass to
``aioredis.create_redis(...)`` call.
**Usage**::
async def connect_redis(url=None):
url = url or 'redis://localhost:6379/0'
return await create_redis(**get_aioredis_parts(url))
:param url: URL to access Redis instance, started with ``redis://``.
"""
parts = urlparse(url)
db = parts.path[1:] or None # type: Optional[Union[str, int]]
if db:
db = int(db)
return {'address': (parts.hostname, parts.port or 6379),
'db': db,
'password': parts.password}
PK ! %]XL L rororo/annotations.py"""
==================
rororo.annotations
==================
Internal module to keep all reusable type annotations for the project in one
place.
"""
import types
from typing import Any, Dict, TypeVar, Union
DictStrInt = Dict[str, int]
DictStrAny = Dict[str, Any]
Settings = Union[types.ModuleType, DictStrAny]
T = TypeVar('T')
PK ! *XbB rororo/logger.py"""
=============
rororo.logger
=============
Logging utilities.
Module provides easy way to setup logging for your web application.
"""
import logging
import sys
from typing import Any, Optional, Union
from .annotations import DictStrAny
class IgnoreErrorsFilter(object):
"""Ignore all warnings and errors from stdout handler."""
def filter(self, record: logging.LogRecord) -> bool:
"""Allow only debug and info log messages to stdout handler."""
return record.levelname in {'DEBUG', 'INFO'}
def default_logging_dict(*loggers: str, **kwargs: Any) -> DictStrAny:
r"""Prepare logging dict suitable with ``logging.config.dictConfig``.
**Usage**::
from logging.config import dictConfig
dictConfig(default_logging_dict('yourlogger'))
:param \*loggers: Enable logging for each logger in sequence.
:param \*\*kwargs: Setup additional logger params via keyword arguments.
"""
kwargs.setdefault('level', 'INFO')
return {
'version': 1,
'disable_existing_loggers': True,
'filters': {
'ignore_errors': {
'()': IgnoreErrorsFilter,
},
},
'formatters': {
'default': {
'format': '%(asctime)s [%(levelname)s:%(name)s] %(message)s',
},
'naked': {
'format': u'%(message)s',
},
},
'handlers': {
'stdout': {
'class': 'logging.StreamHandler',
'filters': ['ignore_errors'],
'formatter': 'default',
'level': 'DEBUG',
'stream': sys.stdout,
},
'stderr': {
'class': 'logging.StreamHandler',
'formatter': 'default',
'level': 'WARNING',
'stream': sys.stderr,
},
},
'loggers': {
logger: dict(handlers=['stdout', 'stderr'], **kwargs)
for logger in loggers
},
}
def update_sentry_logging(logging_dict: DictStrAny,
sentry_dsn: Optional[str],
*loggers: str,
level: Union[str, int]=None,
**kwargs: Any) -> None:
r"""Enable Sentry logging if Sentry DSN passed.
.. note::
Sentry logging requires `raven `_
library to be installed.
**Usage**::
from logging.config import dictConfig
LOGGING = default_logging_dict()
SENTRY_DSN = '...'
update_sentry_logging(LOGGING, SENTRY_DSN)
dictConfig(LOGGING)
**Using AioHttpTransport for SentryHandler**
This will allow to use ``aiohttp.client`` for pushing data to Sentry in
your ``aiohttp.web`` app, which means elimination of sync calls to Sentry.
::
from raven_aiohttp import AioHttpTransport
update_sentry_logging(LOGGING, SENTRY_DSN, transport=AioHttpTransport)
:param logging_dict: Logging dict.
:param sentry_dsn:
Sentry DSN value. If ``None`` do not update logging dict at all.
:param \*loggers:
Use Sentry logging for each logger in the sequence. If the sequence is
empty use Sentry logging to each available logger.
:param \*\*kwargs: Additional kwargs to be passed to ``SentryHandler``.
"""
# No Sentry DSN, nothing to do
if not sentry_dsn:
return
# Add Sentry handler
kwargs['class'] = 'raven.handlers.logging.SentryHandler'
kwargs['dsn'] = sentry_dsn
logging_dict['handlers']['sentry'] = dict(
level=level or 'WARNING',
**kwargs)
loggers = tuple(logging_dict['loggers']) if not loggers else loggers
for logger in loggers:
# Ignore missing loggers
logger_dict = logging_dict['loggers'].get(logger)
if not logger_dict:
continue
# Ignore logger from logger config
if logger_dict.pop('ignore_sentry', False):
continue
# Handlers list should exist
handlers = list(logger_dict.setdefault('handlers', []))
handlers.append('sentry')
logger_dict['handlers'] = tuple(handlers)
PK ! rororo/py.typedPK ! Y Y rororo/schemas/__init__.py"""
==============
rororo.schemas
==============
Validate request and response data using JSON Schema.
"""
from .exceptions import Error
from .schema import Schema
from .utils import defaults
from .validators import DefaultValidator, Validator
( # Make PEP8 happy
defaults,
DefaultValidator,
Error,
Schema,
Validator,
)
PK ! |A1% % rororo/schemas/empty.py"""
====================
rororo.schemas.empty
====================
Collection of empty JSON Schemas, useful for validation empty requests or
responses.
"""
EMPTY_ARRAY = {
'type': 'array',
'maxItems': 0,
}
EMPTY_OBJECT = {
'type': 'object',
'additionalProperties': False,
}
PK ! iE E rororo/schemas/exceptions.py"""
=========================
rororo.schemas.exceptions
=========================
Exceptions for Schemas.
.. note:: If data cannot be validate against given schema ValidationError from
``jsonschema`` library would be raised, not ``rororo.schemas.Error``.
"""
class Error(Exception):
"""Base Schema exception."""
PK ! rororo/schemas/py.typedPK ! E] ] rororo/schemas/schema.py"""
=====================
rororo.schemas.schema
=====================
Implement class for validating request and response data against JSON Schema.
"""
import logging
import types
from typing import Any, Callable, Optional, Type # noqa: F401
from jsonschema.exceptions import ValidationError
from .exceptions import Error
from .utils import AnyMapping, defaults, validate_func_factory, ValidateFunc
from .validators import DefaultValidator
__all__ = ('Schema', )
logger = logging.getLogger(__name__)
class Schema(object):
"""Validate request and response data against JSON Schema."""
__slots__ = (
'error_class', 'module', 'response_factory', 'validate_func',
'validation_error_class', 'validator_class', '_valid_request',
)
def __init__(self,
module: types.ModuleType,
*,
response_factory: Callable[..., Any]=None,
error_class: Any=None,
validator_class: Any=DefaultValidator,
validation_error_class: Type[Exception]=ValidationError,
validate_func: ValidateFunc=None) -> None:
"""Initialize Schema object.
:param module: Module contains at least request and response schemas.
:param response_factory: Put valid response data to this factory func.
:param error_class:
Wrap all errors in given class. If empty real errors would be
reraised.
:param validator_class:
Validator class to use for validating request and response data.
By default: ``rororo.schemas.validators.DefaultValidator``
:param validation_error_class:
Error class to be expected in case of validation error. By default:
``jsonschema.exceptions.ValidationError``
:param validate_func:
Validate function to be called for validating request and response
data. Function will receive 2 args: ``schema`` and ``pure_data``.
By default: ``None``
"""
self._valid_request = None # type: Optional[bool]
self.module = module
self.response_factory = response_factory
self.error_class = error_class
self.validator_class = validator_class
self.validate_func = (
validate_func or
validate_func_factory(validator_class))
self.validation_error_class = validation_error_class
def make_error(self,
message: str,
*,
error: Exception=None,
# ``error_class: Type[Exception]=None`` doesn't work on
# Python 3.5.2, but that is exact version ran by Read the
# Docs :( More info: http://stackoverflow.com/q/42942867
error_class: Any=None) -> Exception:
"""Return error instantiated from given message.
:param message: Message to wrap.
:param error: Validation error.
:param error_class:
Special class to wrap error message into. When omitted
``self.error_class`` will be used.
"""
if error_class is None:
error_class = self.error_class if self.error_class else Error
return error_class(message)
def make_response(self,
data: Any=None,
**kwargs: Any) -> Any:
r"""Validate response data and wrap it inside response factory.
:param data: Response data. Could be ommited.
:param \*\*kwargs: Keyword arguments to be passed to response factory.
"""
if not self._valid_request:
logger.error('Request not validated, cannot make response')
raise self.make_error('Request not validated before, cannot make '
'response')
if data is None and self.response_factory is None:
logger.error('Response data omit, but no response factory is used')
raise self.make_error('Response data could be omitted only when '
'response factory is used')
response_schema = getattr(self.module, 'response', None)
if response_schema is not None:
self._validate(data, response_schema)
if self.response_factory is not None:
return self.response_factory(
*([data] if data is not None else []),
**kwargs)
return data
def validate_request(self,
data: Any,
*additional: AnyMapping,
merged_class: Type[dict]=dict) -> Any:
r"""Validate request data against request schema from module.
:param data: Request data.
:param \*additional:
Additional data dicts to be merged with base request data.
:param merged_class:
When additional data dicts supplied method by default will return
merged **dict** with all data, but you can customize things to
use read-only dict or any other additional class or callable.
"""
request_schema = getattr(self.module, 'request', None)
if request_schema is None:
logger.error(
'Request schema should be defined',
extra={'schema_module': self.module,
'schema_module_attrs': dir(self.module)})
raise self.make_error('Request schema should be defined')
# Merge base and additional data dicts, but only if additional data
# dicts have been supplied
if isinstance(data, dict) and additional:
data = merged_class(self._merge_data(data, *additional))
try:
self._validate(data, request_schema)
finally:
self._valid_request = False
self._valid_request = True
processor = getattr(self.module, 'request_processor', None)
return processor(data) if processor else data
def _merge_data(self, data: AnyMapping, *additional: AnyMapping) -> dict:
r"""Merge base data and additional dicts.
:param data: Base data.
:param \*additional: Additional data dicts to be merged into base dict.
"""
return defaults(
dict(data) if not isinstance(data, dict) else data,
*(dict(item) for item in additional))
def _pure_data(self, data: Any) -> Any:
"""
If data is dict-like object, convert it to pure dict instance, so it
will be possible to pass to default ``jsonschema.validate`` func.
:param data: Request or response data.
"""
if not isinstance(data, dict) and not isinstance(data, list):
try:
return dict(data)
except TypeError:
...
return data
def _validate(self, data: Any, schema: AnyMapping) -> Any:
"""Validate data against given schema.
:param data: Data to validate.
:param schema: Schema to use for validation.
"""
try:
return self.validate_func(schema, self._pure_data(data))
except self.validation_error_class as err:
logger.error(
'Schema validation error',
exc_info=True,
extra={'schema': schema, 'schema_module': self.module})
if self.error_class is None:
raise
raise self.make_error('Validation Error', error=err) from err
PK ! drB B rororo/schemas/utils.py"""
====================
rororo.schemas.utils
====================
Utilities for Schema package.
"""
from typing import Any, Callable, Mapping
AnyMapping = Mapping[Any, Any]
ValidateFunc = Callable[[AnyMapping, AnyMapping], AnyMapping]
def defaults(current: dict, *args: AnyMapping) -> dict:
r"""Override current dict with defaults values.
:param current: Current dict.
:param \*args: Sequence with default data dicts.
"""
for data in args:
for key, value in data.items():
current.setdefault(key, value)
return current
def validate_func_factory(validator_class: Any) -> ValidateFunc:
"""Provide default function for Schema validation.
:param validator_class: JSONSchema suitable validator class.
"""
def validate_func(schema: AnyMapping, pure_data: AnyMapping) -> AnyMapping:
"""Validate schema with given data.
:param schema: Schema representation to use.
:param pure_data: Pure data to validate.
"""
return validator_class(schema).validate(pure_data)
return validate_func
PK ! O
fa rororo/schemas/validators.py"""
=========================
rororo.schemas.validators
=========================
Customize default JSON Schema Draft 4 validator.
"""
from typing import Any, Iterator
from jsonschema.exceptions import ValidationError
from jsonschema.validators import Draft4Validator, extend
from .utils import defaults
class Validator(Draft4Validator):
"""Customize default JSON Schema validator.
This customization allows:
* Use tuples for "array" data type
"""
DEFAULT_TYPES = defaults({
'array': (list, tuple),
}, Draft4Validator.DEFAULT_TYPES) # type: dict
def extend_with_default(validator_class: Any) -> Any:
"""Append defaults from schema to instance need to be validated.
:param validator_class: Apply the change for given validator class.
"""
validate_properties = validator_class.VALIDATORS['properties']
def set_defaults(validator: Any,
properties: dict,
instance: dict,
schema: dict) -> Iterator[ValidationError]:
for prop, subschema in properties.items():
if 'default' in subschema:
instance.setdefault(prop, subschema['default'])
for error in validate_properties(
validator, properties, instance, schema,
):
yield error # pragma: no cover
return extend(validator_class, {'properties': set_defaults})
DefaultValidator = extend_with_default(Validator)
PK !