PK!y_CCrororo/__init__.py""" ====== rororo ====== Collection of utilities, helpers, and principles for building Python backend applications. Supports `aiohttp.web `_, `Flask `_, and your web-framework. """ __author__ = 'Igor Davydenko' __license__ = 'BSD-3-Clause' __version__ = '1.2.0' PK!dۤ rororo/aio.py""" ========== rororo.aio ========== Various utilities for aiohttp and other aio-libs. """ from contextlib import contextmanager from typing import ( # noqa: F401 Any, Callable, Dict, Iterator, Optional, TYPE_CHECKING, Union, ) from urllib.parse import urlparse # Hack to load ``aiohttp.web`` only on mypy run if TYPE_CHECKING: # pragma: no cover from aiohttp import web else: web = type('FakeModule', (object, ), { 'AbstractRouter': Any, 'Request': Any, 'Resource': Any, 'Response': Any, })() __all__ = ('add_resource_context', 'is_xhr_request', 'parse_aioredis_url') View = Callable[[web.Request], web.Response] @contextmanager def add_resource_context(router: web.AbstractRouter, url_prefix: str=None, name_prefix: str=None) -> Iterator[Any]: """Context manager for adding resources for given router. Main goal of context manager to easify process of adding resources with routes to the router. This also allow to reduce amount of repeats, when supplying new resources by reusing URL & name prefixes for all routes inside context manager. Behind the scene, context manager returns a function which calls:: resource = router.add_resource(url, name) resource.add_route(method, handler) **Usage**:: with add_resource_context(app.router, '/api', 'api') as add_resource: add_resource('/', get=views.index) add_resource('/news', get=views.list_news, post=views.create_news) :param router: Route to add resources to. :param url_prefix: If supplied prepend this prefix to each resource URL. :param name_prefix: If supplied prepend this prefix to each resource name. """ def add_resource(url: str, get: View=None, *, name: str=None, **kwargs: Any) -> web.Resource: """Inner function to create resource and add necessary routes to it. Support adding routes of all methods, supported by aiohttp, as GET/POST/PUT/PATCH/DELETE/HEAD/OPTIONS/*, e.g., :: with add_resource_context(app.router) as add_resource: add_resource('/', get=views.get, post=views.post) add_resource('/wildcard', **{'*': views.wildcard}) :param url: Resource URL. If ``url_prefix`` setup in context it will be prepended to URL with ``/``. :param get: GET handler. Only handler to be setup without explicit call. :param name: Resource name. :type name: str :rtype: aiohttp.web.Resource """ kwargs['get'] = get if url_prefix: url = '/'.join((url_prefix.rstrip('/'), url.lstrip('/'))) if not name and get: name = get.__name__ if name_prefix and name: name = '.'.join((name_prefix.rstrip('.'), name.lstrip('.'))) resource = router.add_resource(url, name=name) for method, handler in kwargs.items(): if handler is None: continue resource.add_route(method.upper(), handler) return resource yield add_resource def is_xhr_request(request: web.Request) -> bool: """Check whether current request is XHR one or not. Basically it just checks that request contains ``X-Requested-With`` header and that the header equals to ``XMLHttpRequest``. :param request: Request instance. """ return request.headers.get('X-Requested-With') == 'XMLHttpRequest' def parse_aioredis_url(url: str) -> Dict[str, Any]: """ Convert Redis URL string to dict suitable to pass to ``aioredis.create_redis(...)`` call. **Usage**:: async def connect_redis(url=None): url = url or 'redis://localhost:6379/0' return await create_redis(**get_aioredis_parts(url)) :param url: URL to access Redis instance, started with ``redis://``. """ parts = urlparse(url) db = parts.path[1:] or None # type: Optional[Union[str, int]] if db: db = int(db) return {'address': (parts.hostname, parts.port or 6379), 'db': db, 'password': parts.password} PK!%]XLLrororo/annotations.py""" ================== rororo.annotations ================== Internal module to keep all reusable type annotations for the project in one place. """ import types from typing import Any, Dict, TypeVar, Union DictStrInt = Dict[str, int] DictStrAny = Dict[str, Any] Settings = Union[types.ModuleType, DictStrAny] T = TypeVar('T') PK!*XbBrororo/logger.py""" ============= rororo.logger ============= Logging utilities. Module provides easy way to setup logging for your web application. """ import logging import sys from typing import Any, Optional, Union from .annotations import DictStrAny class IgnoreErrorsFilter(object): """Ignore all warnings and errors from stdout handler.""" def filter(self, record: logging.LogRecord) -> bool: """Allow only debug and info log messages to stdout handler.""" return record.levelname in {'DEBUG', 'INFO'} def default_logging_dict(*loggers: str, **kwargs: Any) -> DictStrAny: r"""Prepare logging dict suitable with ``logging.config.dictConfig``. **Usage**:: from logging.config import dictConfig dictConfig(default_logging_dict('yourlogger')) :param \*loggers: Enable logging for each logger in sequence. :param \*\*kwargs: Setup additional logger params via keyword arguments. """ kwargs.setdefault('level', 'INFO') return { 'version': 1, 'disable_existing_loggers': True, 'filters': { 'ignore_errors': { '()': IgnoreErrorsFilter, }, }, 'formatters': { 'default': { 'format': '%(asctime)s [%(levelname)s:%(name)s] %(message)s', }, 'naked': { 'format': u'%(message)s', }, }, 'handlers': { 'stdout': { 'class': 'logging.StreamHandler', 'filters': ['ignore_errors'], 'formatter': 'default', 'level': 'DEBUG', 'stream': sys.stdout, }, 'stderr': { 'class': 'logging.StreamHandler', 'formatter': 'default', 'level': 'WARNING', 'stream': sys.stderr, }, }, 'loggers': { logger: dict(handlers=['stdout', 'stderr'], **kwargs) for logger in loggers }, } def update_sentry_logging(logging_dict: DictStrAny, sentry_dsn: Optional[str], *loggers: str, level: Union[str, int]=None, **kwargs: Any) -> None: r"""Enable Sentry logging if Sentry DSN passed. .. note:: Sentry logging requires `raven `_ library to be installed. **Usage**:: from logging.config import dictConfig LOGGING = default_logging_dict() SENTRY_DSN = '...' update_sentry_logging(LOGGING, SENTRY_DSN) dictConfig(LOGGING) **Using AioHttpTransport for SentryHandler** This will allow to use ``aiohttp.client`` for pushing data to Sentry in your ``aiohttp.web`` app, which means elimination of sync calls to Sentry. :: from raven_aiohttp import AioHttpTransport update_sentry_logging(LOGGING, SENTRY_DSN, transport=AioHttpTransport) :param logging_dict: Logging dict. :param sentry_dsn: Sentry DSN value. If ``None`` do not update logging dict at all. :param \*loggers: Use Sentry logging for each logger in the sequence. If the sequence is empty use Sentry logging to each available logger. :param \*\*kwargs: Additional kwargs to be passed to ``SentryHandler``. """ # No Sentry DSN, nothing to do if not sentry_dsn: return # Add Sentry handler kwargs['class'] = 'raven.handlers.logging.SentryHandler' kwargs['dsn'] = sentry_dsn logging_dict['handlers']['sentry'] = dict( level=level or 'WARNING', **kwargs) loggers = tuple(logging_dict['loggers']) if not loggers else loggers for logger in loggers: # Ignore missing loggers logger_dict = logging_dict['loggers'].get(logger) if not logger_dict: continue # Ignore logger from logger config if logger_dict.pop('ignore_sentry', False): continue # Handlers list should exist handlers = list(logger_dict.setdefault('handlers', [])) handlers.append('sentry') logger_dict['handlers'] = tuple(handlers) PK!rororo/py.typedPK!YYrororo/schemas/__init__.py""" ============== rororo.schemas ============== Validate request and response data using JSON Schema. """ from .exceptions import Error from .schema import Schema from .utils import defaults from .validators import DefaultValidator, Validator ( # Make PEP8 happy defaults, DefaultValidator, Error, Schema, Validator, ) PK!|A1%%rororo/schemas/empty.py""" ==================== rororo.schemas.empty ==================== Collection of empty JSON Schemas, useful for validation empty requests or responses. """ EMPTY_ARRAY = { 'type': 'array', 'maxItems': 0, } EMPTY_OBJECT = { 'type': 'object', 'additionalProperties': False, } PK!iEErororo/schemas/exceptions.py""" ========================= rororo.schemas.exceptions ========================= Exceptions for Schemas. .. note:: If data cannot be validate against given schema ValidationError from ``jsonschema`` library would be raised, not ``rororo.schemas.Error``. """ class Error(Exception): """Base Schema exception.""" PK!rororo/schemas/py.typedPK!؁E]]rororo/schemas/schema.py""" ===================== rororo.schemas.schema ===================== Implement class for validating request and response data against JSON Schema. """ import logging import types from typing import Any, Callable, Optional, Type # noqa: F401 from jsonschema.exceptions import ValidationError from .exceptions import Error from .utils import AnyMapping, defaults, validate_func_factory, ValidateFunc from .validators import DefaultValidator __all__ = ('Schema', ) logger = logging.getLogger(__name__) class Schema(object): """Validate request and response data against JSON Schema.""" __slots__ = ( 'error_class', 'module', 'response_factory', 'validate_func', 'validation_error_class', 'validator_class', '_valid_request', ) def __init__(self, module: types.ModuleType, *, response_factory: Callable[..., Any]=None, error_class: Any=None, validator_class: Any=DefaultValidator, validation_error_class: Type[Exception]=ValidationError, validate_func: ValidateFunc=None) -> None: """Initialize Schema object. :param module: Module contains at least request and response schemas. :param response_factory: Put valid response data to this factory func. :param error_class: Wrap all errors in given class. If empty real errors would be reraised. :param validator_class: Validator class to use for validating request and response data. By default: ``rororo.schemas.validators.DefaultValidator`` :param validation_error_class: Error class to be expected in case of validation error. By default: ``jsonschema.exceptions.ValidationError`` :param validate_func: Validate function to be called for validating request and response data. Function will receive 2 args: ``schema`` and ``pure_data``. By default: ``None`` """ self._valid_request = None # type: Optional[bool] self.module = module self.response_factory = response_factory self.error_class = error_class self.validator_class = validator_class self.validate_func = ( validate_func or validate_func_factory(validator_class)) self.validation_error_class = validation_error_class def make_error(self, message: str, *, error: Exception=None, # ``error_class: Type[Exception]=None`` doesn't work on # Python 3.5.2, but that is exact version ran by Read the # Docs :( More info: http://stackoverflow.com/q/42942867 error_class: Any=None) -> Exception: """Return error instantiated from given message. :param message: Message to wrap. :param error: Validation error. :param error_class: Special class to wrap error message into. When omitted ``self.error_class`` will be used. """ if error_class is None: error_class = self.error_class if self.error_class else Error return error_class(message) def make_response(self, data: Any=None, **kwargs: Any) -> Any: r"""Validate response data and wrap it inside response factory. :param data: Response data. Could be ommited. :param \*\*kwargs: Keyword arguments to be passed to response factory. """ if not self._valid_request: logger.error('Request not validated, cannot make response') raise self.make_error('Request not validated before, cannot make ' 'response') if data is None and self.response_factory is None: logger.error('Response data omit, but no response factory is used') raise self.make_error('Response data could be omitted only when ' 'response factory is used') response_schema = getattr(self.module, 'response', None) if response_schema is not None: self._validate(data, response_schema) if self.response_factory is not None: return self.response_factory( *([data] if data is not None else []), **kwargs) return data def validate_request(self, data: Any, *additional: AnyMapping, merged_class: Type[dict]=dict) -> Any: r"""Validate request data against request schema from module. :param data: Request data. :param \*additional: Additional data dicts to be merged with base request data. :param merged_class: When additional data dicts supplied method by default will return merged **dict** with all data, but you can customize things to use read-only dict or any other additional class or callable. """ request_schema = getattr(self.module, 'request', None) if request_schema is None: logger.error( 'Request schema should be defined', extra={'schema_module': self.module, 'schema_module_attrs': dir(self.module)}) raise self.make_error('Request schema should be defined') # Merge base and additional data dicts, but only if additional data # dicts have been supplied if isinstance(data, dict) and additional: data = merged_class(self._merge_data(data, *additional)) try: self._validate(data, request_schema) finally: self._valid_request = False self._valid_request = True processor = getattr(self.module, 'request_processor', None) return processor(data) if processor else data def _merge_data(self, data: AnyMapping, *additional: AnyMapping) -> dict: r"""Merge base data and additional dicts. :param data: Base data. :param \*additional: Additional data dicts to be merged into base dict. """ return defaults( dict(data) if not isinstance(data, dict) else data, *(dict(item) for item in additional)) def _pure_data(self, data: Any) -> Any: """ If data is dict-like object, convert it to pure dict instance, so it will be possible to pass to default ``jsonschema.validate`` func. :param data: Request or response data. """ if not isinstance(data, dict) and not isinstance(data, list): try: return dict(data) except TypeError: ... return data def _validate(self, data: Any, schema: AnyMapping) -> Any: """Validate data against given schema. :param data: Data to validate. :param schema: Schema to use for validation. """ try: return self.validate_func(schema, self._pure_data(data)) except self.validation_error_class as err: logger.error( 'Schema validation error', exc_info=True, extra={'schema': schema, 'schema_module': self.module}) if self.error_class is None: raise raise self.make_error('Validation Error', error=err) from err PK!drBBrororo/schemas/utils.py""" ==================== rororo.schemas.utils ==================== Utilities for Schema package. """ from typing import Any, Callable, Mapping AnyMapping = Mapping[Any, Any] ValidateFunc = Callable[[AnyMapping, AnyMapping], AnyMapping] def defaults(current: dict, *args: AnyMapping) -> dict: r"""Override current dict with defaults values. :param current: Current dict. :param \*args: Sequence with default data dicts. """ for data in args: for key, value in data.items(): current.setdefault(key, value) return current def validate_func_factory(validator_class: Any) -> ValidateFunc: """Provide default function for Schema validation. :param validator_class: JSONSchema suitable validator class. """ def validate_func(schema: AnyMapping, pure_data: AnyMapping) -> AnyMapping: """Validate schema with given data. :param schema: Schema representation to use. :param pure_data: Pure data to validate. """ return validator_class(schema).validate(pure_data) return validate_func PK!O farororo/schemas/validators.py""" ========================= rororo.schemas.validators ========================= Customize default JSON Schema Draft 4 validator. """ from typing import Any, Iterator from jsonschema.exceptions import ValidationError from jsonschema.validators import Draft4Validator, extend from .utils import defaults class Validator(Draft4Validator): """Customize default JSON Schema validator. This customization allows: * Use tuples for "array" data type """ DEFAULT_TYPES = defaults({ 'array': (list, tuple), }, Draft4Validator.DEFAULT_TYPES) # type: dict def extend_with_default(validator_class: Any) -> Any: """Append defaults from schema to instance need to be validated. :param validator_class: Apply the change for given validator class. """ validate_properties = validator_class.VALIDATORS['properties'] def set_defaults(validator: Any, properties: dict, instance: dict, schema: dict) -> Iterator[ValidationError]: for prop, subschema in properties.items(): if 'default' in subschema: instance.setdefault(prop, subschema['default']) for error in validate_properties( validator, properties, instance, schema, ): yield error # pragma: no cover return extend(validator_class, {'properties': set_defaults}) DefaultValidator = extend_with_default(Validator) PK! Union[str, Optional[T]]: """Shortcut for safely reading environment variable. :param key: Environment var key. :param default: Return default value if environment var not found by given key. By default: ``None`` """ return os.getenv(key, default) def immutable_settings(defaults: Settings, **optionals: Any) -> types.MappingProxyType: r"""Initialize and return immutable Settings dictionary. Settings dictionary allows you to setup settings values from multiple sources and make sure that values cannot be changed, updated by anyone else after initialization. This helps keep things clear and not worry about hidden settings change somewhere around your web application. :param defaults: Read settings values from module or dict-like instance. :param \*\*optionals: Update base settings with optional values. In common additional values shouldn't be passed, if settings values already populated from local settings or environment. But in case of using application factories this makes sense:: from . import settings def create_app(**options): app = ... app.settings = immutable_settings(settings, **options) return app And yes each additional key overwrite default setting value. """ settings = {key: value for key, value in iter_settings(defaults)} for key, value in iter_settings(optionals): settings[key] = value return types.MappingProxyType(settings) def inject_settings(mixed: Union[str, Settings], context: MutableMapping[str, Any], fail_silently: bool=False) -> None: """Inject settings values to given context. :param mixed: Settings can be a string (that it will be read from Python path), Python module or dict-like instance. :param context: Context to assign settings key values. It should support dict-like item assingment. :param fail_silently: When enabled and reading settings from Python path ignore errors if given Python path couldn't be loaded. """ if isinstance(mixed, str): try: mixed = import_module(mixed) except Exception: if fail_silently: return raise for key, value in iter_settings(mixed): context[key] = value def is_setting_key(key: str) -> bool: """Check whether given key is valid setting key or not. Only public uppercase constants are valid settings keys, all other keys are invalid and shouldn't present in Settings dict. **Valid settings keys** :: DEBUG SECRET_KEY **Invalid settings keys** :: _PRIVATE_SECRET_KEY camelCasedSetting rel secret_key :param key: Key to check. """ return key.isupper() and key[0] != '_' def iter_settings(mixed: Settings) -> Iterator[Tuple[str, Any]]: """Iterate over settings values from settings module or dict-like instance. :param mixed: Settings instance to iterate. """ if isinstance(mixed, types.ModuleType): for attr in dir(mixed): if not is_setting_key(attr): continue yield (attr, getattr(mixed, attr)) else: yield from filter(lambda item: is_setting_key(item[0]), mixed.items()) def setup_locale(locale: str, first_weekday: int=None) -> str: """Shortcut helper to setup locale for backend application. :param locale: Locale to use. :param first_weekday: Weekday for start week. 0 for Monday, 6 for Sunday. By default: None """ if first_weekday is not None: calendar.setfirstweekday(first_weekday) return setlocale(LC_ALL, locale) def setup_timezone(timezone: str) -> None: """Shortcut helper to configure timezone for backend application. :param timezone: Timezone to use, e.g. "UTC", "Europe/Kiev". """ if timezone and hasattr(time, 'tzset'): tz_root = '/usr/share/zoneinfo' tz_filename = os.path.join(tz_root, *(timezone.split('/'))) if os.path.exists(tz_root) and not os.path.exists(tz_filename): raise ValueError('Incorrect timezone value: {0}'.format(timezone)) os.environ['TZ'] = timezone time.tzset() # Make flake8 happy (setup_logging, to_bool) PK!UkOOrororo/timedelta.py""" ================ rororo.timedelta ================ Useful functions to work with timedelta instances. """ import datetime import re from typing import Optional from .annotations import DictStrInt from .utils import to_int __all__ = ( 'str_to_timedelta', 'timedelta_average', 'timedelta_div', 'timedelta_seconds', 'timedelta_to_str', ) SECONDS_PER_DAY = 86400 SECONDS_PER_WEEK = 604800 TIMEDELTA_FORMAT = 'G:i' TIMEDELTA_FORMATS = { 'd': ('%(days)02d', r'(?P\d{2,})'), 'f': ('%(weeks)d%(short_weeks_label)s ' '%(week_days)d%(short_week_days_label)s ' '%(day_hours)d:%(hour_minutes)02d:%(minute_seconds)02d', r'((?P\d+)%(short_weeks_label)s )?' r'((?P\d{1,})%(short_week_days_label)s )?' r'(?P\d{1,2})\:(?P\d{2})(\:(?P\d{2}))?'), 'F': ('%(weeks)d %(weeks_label)s, %(week_days)d %(week_days_label)s, ' '%(day_hours)d:%(hour_minutes)02d:%(minute_seconds)02d', r'((?P\d+) %(weeks_label)s, )?' r'((?P\d{1,}) %(week_days_label)s, )?' r'(?P\d{1,2})\:(?P\d{2})(\:(?P\d{2}))?'), 'g': ('%(day_hours)d', r'(?P\d{1,2})'), 'G': ('%(hours)d', r'(?P\d+)'), 'h': ('%(day_hours)02d', r'(?P\d{2})'), 'H': ('%(hours)02d', r'(?P\d{2,})'), 'i': ('%(hour_minutes)02d', r'(?P\d{2})'), 'I': ('%(minutes)02d', r'(?P\d{2,})'), 'j': ('%(days)d', r'(?P\d+)'), 'l': ('%(days_label)s', r'%(days_label)s'), 'L': ('%(weeks_label)s', r'%(weeks_label)s'), 'm': ('%(week_days_label)s', r'%(week_days_label)s'), 'r': ('%(days)d%(short_days_label)s ' '%(day_hours)d:%(hour_minutes)02d:%(minute_seconds)02d', r'((?P\d+)%(short_days_label)s )?' r'(?P\d{1,2})\:(?P\d{2})\:(?P\d{2})'), 'R': ('%(days)d %(days_label)s, ' '%(day_hours)d:%(hour_minutes)02d:%(minute_seconds)02d', r'((?P\d+) %(days_label)s, )?' r'(?P\d{1,2})\:(?P\d{2})\:(?P\d{2})'), 's': ('%(minute_seconds)02d', r'(?P\d{2})'), 'S': ('%(seconds)02d', r'(?P{2,})'), '': ('%(microseconds)d', r'(?P\d{1,})'), 'w': ('%(week_days)d', r'(?P\d{1})'), 'W': ('%(weeks)d', r'(?P\d+)'), } def str_to_timedelta(value: str, fmt: str=None) -> Optional[datetime.timedelta]: """ Convert string value to timedelta instance according to the given format. If format not set function tries to load timedelta using default ``TIMEDELTA_FORMAT`` and then both of magic "full" formats. You should also specify list of formats and function tries to convert to timedelta using each of formats in list. First matched format would return the converted timedelta instance. If user specified format, but function cannot convert string to new timedelta instance - ``ValueError`` would be raised. But if user did not specify the format, function would be fail silently and return ``None`` as result. :param value: String representation of timedelta. :param fmt: Format to use for conversion. """ def timedelta_kwargs(data: DictStrInt) -> DictStrInt: """ Convert day_hours, hour_minutes, minute_seconds, week_days and weeks to timedelta seconds. """ seconds = data.get('seconds', 0) seconds += data.get('day_hours', 0) * 3600 seconds += data.pop('hour_minutes', 0) * 60 seconds += data.pop('minute_seconds', 0) seconds += data.pop('week_days', 0) * SECONDS_PER_DAY seconds += data.pop('weeks', 0) * SECONDS_PER_WEEK data.update({'seconds': seconds}) return data if not isinstance(value, str): raise ValueError( 'Value should be a "str" instance. You use {0}.' .format(type(value))) user_fmt = fmt if isinstance(fmt, (list, tuple)): formats = list(fmt) elif fmt is None: formats = [TIMEDELTA_FORMAT, 'F', 'f'] else: formats = [fmt] locale_data = { 'days_label': '({0}|{1})'.format('day', 'days'), 'short_days_label': 'd', 'short_week_days_label': 'd', 'short_weeks_label': 'w', 'week_days_label': '({0}|{1})'.format('day', 'days'), 'weeks_label': '({0}|{1})'.format('week', 'weeks'), } regexps = [] for item in formats: processed = r'^' for part in item: if part in TIMEDELTA_FORMATS: part = TIMEDELTA_FORMATS[part][1] % locale_data else: part = re.escape(part) processed += part processed += r'$' regexps.append(processed) for regexp in regexps: timedelta_re = re.compile(regexp) matched = timedelta_re.match(value) if matched: data = { key: to_int(value) or 0 for key, value in matched.groupdict().items()} return datetime.timedelta(**timedelta_kwargs(data)) if user_fmt: raise ValueError( 'Cannot convert {0!r} to timedelta instance, using {1!r} format.' .format(value, user_fmt)) return None def timedelta_average(*values: datetime.timedelta) -> datetime.timedelta: r"""Compute the arithmetic mean for timedeltas list. :param \*values: Timedelta instances to process. """ if isinstance(values[0], (list, tuple)): values = values[0] return sum(values, datetime.timedelta()) // len(values) def timedelta_div(first: datetime.timedelta, second: datetime.timedelta) -> Optional[float]: """Implement divison for timedelta instances. :param first: First timedelta instance. :param second: Second timedelta instance. """ first_seconds = timedelta_seconds(first) second_seconds = timedelta_seconds(second) if not second_seconds: return None return first_seconds / second_seconds def timedelta_seconds(value: datetime.timedelta) -> int: """Return full number of seconds from timedelta. By default, Python returns only one day seconds, not all timedelta seconds. :param value: Timedelta instance. """ return SECONDS_PER_DAY * value.days + value.seconds def timedelta_to_str(value: datetime.timedelta, fmt: str=None) -> str: """Display the timedelta formatted according to the given string. You should use global setting ``TIMEDELTA_FORMAT`` to specify default format to this function there (like ``DATE_FORMAT`` for builtin ``date`` template filter). Default value for ``TIMEDELTA_FORMAT`` is ``'G:i'``. Format uses the same policy as Django ``date`` template filter or PHP ``date`` function with several differences. Available format strings: +------------------+-----------------------------+------------------------+ | Format character | Description | Example output | +==================+=============================+========================+ | ``a`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``A`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``b`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``B`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``c`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``d`` | Total days, 2 digits with | ``'01'``, ``'41'`` | | | leading zeros. Do not | | | | combine with ``w`` format. | | +------------------+-----------------------------+------------------------+ | ``D`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``f`` | Magic "full" format with | ``'2w 4d 1:28:07'`` | | | short labels. | | +------------------+-----------------------------+------------------------+ | ``F`` | Magic "full" format with | ``'2 weeks, 4 days, | | | normal labels. | 1:28:07'`` | +------------------+-----------------------------+------------------------+ | ``g`` | Day, not total, hours | ``'0'`` to ``'23'`` | | | without leading zeros. To | | | | use with ``d``, ``j``, or | | | | ``w``. | | +------------------+-----------------------------+------------------------+ | ``G`` | Total hours without | ``'1'``, ``'433'`` | | | leading zeros. Do not | | | | combine with ``g`` or | | | | ``h`` formats. | | +------------------+-----------------------------+------------------------+ | ``h`` | Day, not total, hours with | ``'00'`` to ``'23'`` | | | leading zeros. To use with | | | | ``d`` or ``w``. | | +------------------+-----------------------------+------------------------+ | ``H`` | Total hours with leading | ``'01', ``'433'`` | | | zeros. Do not combine with | | | | ``g`` or ``h`` formats. | | +------------------+-----------------------------+------------------------+ | ``i`` | Hour, not total, minutes, 2 | ``00`` to ``'59'`` | | | digits with leading zeros | | | | To use with ``g``, ``G``, | | | | ``h`` or ``H`` formats. | | +------------------+-----------------------------+------------------------+ | ``I`` | Total minutes, 2 digits or | ``'01'``, ``'433'`` | | | more with leading zeros. Do | | | | not combine with ``i`` | | | | format. | | +------------------+-----------------------------+------------------------+ | ``j`` | Total days, one or 2 digits | ``'1'``, ``'41'`` | | | without leading zeros. Do | | | | not combine with ``w`` | | | | format. | | +------------------+-----------------------------+------------------------+ | ``J`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``l`` | Days long label. | ``'day'`` or | | | Pluralized and localized. | ``'days'`` | +------------------+-----------------------------+------------------------+ | ``L`` | Weeks long label. | ``'week'`` or | | | Pluralized and localized. | ``'weeks'`` | +------------------+-----------------------------+------------------------+ | ``m`` | Week days long label. | ``'day'`` or | | | Pluralized and localized. | ``'days'`` | +------------------+-----------------------------+------------------------+ | ``M`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``n`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``N`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``O`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``P`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``r`` | Standart Python timedelta | ``'18 d 1:28:07'`` | | | representation with short | | | | labels. | | +------------------+-----------------------------+------------------------+ | ``R`` | Standart Python timedelta | ``'18 days, 1:28:07'`` | | | representation with normal | | | | labels. | | +------------------+-----------------------------+------------------------+ | ``s`` | Minute, not total, seconds, | ``'00'`` to ``'59'`` | | | 2 digits with leading | | | | zeros. To use with ``i`` or | | | | ``I``. | | +------------------+-----------------------------+------------------------+ | ``S`` | Total seconds. 2 digits or | ``'00'``, ``'433'`` | | | more with leading zeros. Do | | | | not combine with ``s`` | | | | format. | | +------------------+-----------------------------+------------------------+ | ``t`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``T`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``u`` | Second, not total, | ``0`` to ``999999`` | | | microseconds. | | +------------------+-----------------------------+------------------------+ | ``U`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``w`` | Week, not total, days, one | ``0`` to ``6`` | | | digit without leading | | | | zeros. To use with ``W``. | | +------------------+-----------------------------+------------------------+ | ``W`` | Total weeks, one or more | ``'1'``, ``'41'`` | | | digits without leading | | | | zeros. | | +------------------+-----------------------------+------------------------+ | ``y`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``Y`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``z`` | Not implemented. | | +------------------+-----------------------------+------------------------+ | ``Z`` | Not implemented. | | +------------------+-----------------------------+------------------------+ For example, :: >>> import datetime >>> from rororo.timedelta import timedelta_to_str >>> delta = datetime.timedelta(seconds=99660) >>> timedelta_to_str(delta) ... '27:41' >>> timedelta_to_str(delta, 'r') ... '1d 3:41:00' >>> timedelta_to_str(delta, 'f') ... '1d 3:41' >>> timedelta_to_str(delta, 'W L, w l, H:i:s') ... '0 weeks, 1 day, 03:41:00' Couple words about magic "full" formats. These formats show weeks number with week label, days number with day label and seconds only if weeks number, days number or seconds greater that zero. For example, :: >>> import datetime >>> from rororo.timedelta import timedelta_to_str >>> delta = datetime.timedelta(hours=12) >>> timedelta_to_str(delta, 'f') ... '12:00' >>> timedelta_to_str(delta, 'F') ... '12:00' >>> delta = datetime.timedelta(hours=12, seconds=30) >>> timedelta_to_str(delta, 'f') ... '12:00:30' >>> timedelta_to_str(delta, 'F') ... '12:00:30' >>> delta = datetime.timedelta(hours=168) >>> timedelta_to_str(delta, 'f') ... '1w 0:00' >>> timedelta_to_str(delta, 'F') ... '1 week, 0:00' :param value: Timedelta instance to convert to string. :param fmt: Format to use for conversion. """ # Only ``datetime.timedelta`` instances allowed for this function if not isinstance(value, datetime.timedelta): raise ValueError( 'Value should be a "datetime.timedelta" instance. You use {0}.' .format(type(value))) # Generate total data days = value.days microseconds = value.microseconds seconds = timedelta_seconds(value) hours = seconds // 3600 minutes = seconds // 60 weeks = days // 7 # Generate collapsed data day_hours = hours - days * 24 hour_minutes = minutes - hours * 60 minute_seconds = seconds - minutes * 60 week_days = days - weeks * 7 days_label = 'day' if days % 10 == 1 else 'days' short_days_label = 'd' short_week_days_label = 'd' short_weeks_label = 'w' week_days_label = 'day' if week_days % 10 == 1 else 'days' weeks_label = 'week' if weeks % 10 == 1 else 'weeks' # Collect data data = locals() fmt = fmt or TIMEDELTA_FORMAT processed = '' for part in fmt: if part in TIMEDELTA_FORMATS: is_full_part = part in ('f', 'F') is_repr_part = part in ('r', 'R') part = TIMEDELTA_FORMATS[part][0] if is_full_part or is_repr_part: if is_repr_part and not days: part = part.replace('%(days)d', '') part = part.replace('%(days_label)s,', '') part = part.replace('%(short_days_label)s', '') if is_full_part and not minute_seconds: part = part.replace(':%(minute_seconds)02d', '') if is_full_part and not weeks: part = part.replace('%(weeks)d', '') part = part.replace('%(short_weeks_label)s', '') part = part.replace('%(weeks_label)s,', '') if is_full_part and not week_days: part = part.replace('%(week_days)d', '') part = part.replace('%(short_week_days_label)s', '') part = part.replace('%(week_days_label)s,', '') part = part.strip() part = ' '.join(part.split()) processed += part return processed % data PK!8^  rororo/utils.py""" ============ rororo.utils ============ Different utility functions, which are common used in web development, like converting string to int or bool. """ from distutils.util import strtobool from typing import Any, Optional, Union from .annotations import T def to_bool(value: Any) -> bool: """Convert string or other Python object to boolean. **Rationalle** Passing flags is one of the most common cases of using environment vars and as values are strings we need to have an easy way to convert them to boolean Python value. Without this function int or float string values can be converted as false positives, e.g. ``bool('0') => True``, but using this function ensure that digit flag be properly converted to boolean value. :param value: String or other value. """ return bool(strtobool(value) if isinstance(value, str) else value) def to_int(value: str, default: T=None) -> Union[int, Optional[T]]: """Convert given value to int. If conversion failed, return default value without raising Exception. :param value: Value to convert to int. :param default: Default value to use in case of failed conversion. """ try: return int(value) except (TypeError, ValueError): return default PK!Χerororo-1.2.0.dist-info/LICENSECopyright (c) 2013-2017, Igor Davydenko All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. * Neither the name of the rororo nor the names of its contributors may be use to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. PK!H\TTrororo-1.2.0.dist-info/WHEEL 1 0 нR \I$ơ7.ZON `h6oi14m,b4>4ɛpK>X;baP>PK!HB: rororo-1.2.0.dist-info/METADATAVmo6_qa/&kqNc^q%FfCI9տQl9ys;3⿸BU)#+y F?yMO+2˒&g@ANpKX6ʄ2EUuֈ\잣Zyo¼Zg vNdڨ.w>(sOTc $!6MQ(M^L6 rtJ]Hp&2^Y>ħXroonAj}f@t (ZHvt/jߤ0E0I嘗L+OS m0)}CO_+oi> EܿcA='"e0oeisE|Xf*jۂy v|ÞRp/o"CIǩ'Yz;e 뵾}P*9f so: e@=vY\4O1/R\*֍ Aɞ~`M6[aQ,#} 8"wfwO[QmD5㮍'3*/&P,OU;ۉ91hE~ԧ5^ћ>$J 8yaeI_]TXi|A♋3dfM! ") }-G$bXyƥ9YsyRŁg/R,'>8U֐A}8$±1ty YtBqjjom"誺}F 7]ZcÙu+~ YփaSS'-[*KoR{ݑ%9hα*iW o>[7WiwC>7UeU(?PK!y_CCrororo/__init__.pyPK!dۤ srororo/aio.pyPK!%]XLL|rororo/annotations.pyPK!*XbBrororo/logger.pyPK!$rororo/py.typedPK!YY$rororo/schemas/__init__.pyPK!|A1%%r&rororo/schemas/empty.pyPK!iEE'rororo/schemas/exceptions.pyPK!K)rororo/schemas/py.typedPK!؁E]])rororo/schemas/schema.pyPK!drBBGrororo/schemas/utils.pyPK!O faKrororo/schemas/validators.pyPK!