PK!'!!json_syntax/__init__.py""" The JSON syntax library is a combinatorial parser / generator library for managing conversion of Python objects to and from common JSON types. It's not strictly limited to JSON, but that's the major use case. """ from .ruleset import RuleSet from .std import ( # noqa atoms, decimals, decimals_as_str, floats, floats_nan_str, iso_dates, optional, enums, faux_enums, lists, sets, dicts, ) from .attrs import attrs_classes, named_tuples, tuples from .unions import unions from .helpers import JSON2PY, PY2JSON, INSP_PY, INSP_JSON # noqa def std_ruleset( floats=floats, decimals=decimals, dates=iso_dates, enums=enums, lists=lists, sets=sets, unions=unions, extras=(), custom=RuleSet, cache=None, ): """ Constructs a RuleSet with the provided rules. The arguments here are to make it easy to override. For example, to replace ``decimals`` with ``decimals_as_str`` just call ``std_ruleset(decimals=decimals_as_str)`` """ return custom( atoms, floats, decimals, dates, optional, enums, lists, attrs_classes, sets, dicts, named_tuples, tuples, unions, *extras, cache=cache, ) PK!666json_syntax/action_v1.pyfrom .helpers import ErrorContext, err_ctx from datetime import date, datetime, time import math def check_parse_error(value, parser, error): try: parser(value) except error: return False else: return True def check_isinst(value, typ): return isinstance(value, typ) def check_has_type(value, typ): return type(value) == typ def convert_float(value): value = float(value) if math.isfinite(value): return value elif math.isnan(value): return "NaN" elif value < 0.0: return "-Infinity" else: return "Infinity" def check_float(value): return ( isinstance(value, (int, float)) or isinstance(value, str) and value.lower() in ("nan", "inf", "infinity" "-inf", "-infinity", "+inf", "+infinity") ) def convert_enum_str(value, typ): return typ(value).name def convert_none(value): if value is not None: raise ValueError("Expected None") return None def check_str_enum(value, mapping): return isinstance(value, str) and value in mapping def convert_str_enum(value, mapping): return mapping[value] if hasattr(datetime, "fromisoformat"): convert_date = date.fromisoformat convert_datetime = datetime.fromisoformat convert_time = time.fromisoformat else: from dateutil.parser import isoparser instance = isoparser(sep="T") convert_date = instance.parse_isodate convert_datetime = instance.isoparse convert_time = instance.parse_isotime del instance def convert_optional(value, inner): if value is None: return None return inner(value) def check_optional(value, inner): return value is None or inner(value) def convert_collection(value, inner, con): return con( err_ctx("[{}]".format(i), lambda: inner(val)) for i, val in enumerate(value) ) def check_collection(value, inner, con): return isinstance(value, con) and all( err_ctx("[{}]".format(i), lambda: inner(val)) for i, val in enumerate(value) ) def convert_mapping(value, key, val, con): return con(err_ctx(k, lambda: (key(k), val(v))) for k, v in value.items()) def check_mapping(value, key, val, con): return isinstance(value, con) and all( err_ctx(k, lambda: key(k) and val(v)) for k, v in value.items() ) def convert_dict_to_attrs(value, pre_hook, inner_map, con): value = pre_hook(value) args = {} for name, inner in inner_map: with ErrorContext("[{!r}]".format(name)): try: arg = value[name] except KeyError: pass else: args[name] = inner(arg) return con(**args) def check_dict(value, inner_map, pre_hook): value = pre_hook(value) if not isinstance(value, dict): return False for name, inner, required in inner_map: with ErrorContext("[{!r}]".format(name)): try: arg = value[name] except KeyError: if required: return False else: if not inner(arg): return False return True def convert_attrs_to_dict(value, post_hook, inner_map): out = {} for name, inner, default in inner_map: with ErrorContext("." + name): field = getattr(value, name) if field == default: continue out[name] = inner(field) if post_hook is not None: out = getattr(value, post_hook)(out) return out def convert_tuple_as_list(value, inner, con): return con( err_ctx("[{}]".format(i), lambda: cvt(val)) for i, (val, cvt) in enumerate(zip(value, inner)) ) def check_tuple_as_list(value, inner, con): return ( isinstance(value, con) and len(value) == len(inner) and all( err_ctx("[{}]".format(i), lambda: chk(val)) for i, (val, chk) in enumerate(zip(value, inner)) ) ) def check_union(value, steps): return any(err_ctx(name, lambda: step(value)) for step, name in steps) def convert_union(value, steps, typename): for check, convert, name in steps: with ErrorContext(name): if check(value): return convert(value) raise ValueError("Expected value of type {} got {!r}".format(typename, value)) PK!17json_syntax/attrs.pyfrom .helpers import ( JSON2PY, PY2JSON, INSP_JSON, INSP_PY, SENTINEL, has_origin, identity, is_attrs_field_required, issub_safe, resolve_fwd_ref, ) from .action_v1 import ( check_dict, check_isinst, check_tuple_as_list, convert_attrs_to_dict, convert_dict_to_attrs, convert_tuple_as_list, ) from functools import partial def attrs_classes( verb, typ, ctx, pre_hook="__json_pre_decode__", post_hook="__json_post_encode__", check="__json_check__", ): """ Handle an ``@attr.s`` or ``@dataclass`` decorated class. """ if verb not in (JSON2PY, PY2JSON, INSP_PY, INSP_JSON): return try: fields = typ.__attrs_attrs__ except AttributeError: try: fields = typ.__dataclass_fields__ except AttributeError: return else: fields = fields.values() if verb == INSP_PY: return partial(check_isinst, typ=typ) inner_map = [] for field in fields: if field.init or verb == PY2JSON: tup = ( field.name, ctx.lookup( verb=verb, typ=resolve_fwd_ref(field.type, typ), accept_missing=True ), ) if verb == PY2JSON: tup += (field.default,) elif verb == INSP_JSON: tup += (is_attrs_field_required(field),) inner_map.append(tup) if verb == JSON2PY: pre_hook_method = getattr(typ, pre_hook, identity) return partial( convert_dict_to_attrs, pre_hook=pre_hook_method, inner_map=tuple(inner_map), con=typ, ) elif verb == PY2JSON: post_hook = post_hook if hasattr(typ, post_hook) else None return partial( convert_attrs_to_dict, post_hook=post_hook, inner_map=tuple(inner_map) ) elif verb == INSP_JSON: check = getattr(typ, check, None) if check: return check pre_hook_method = getattr(typ, pre_hook, identity) return partial(check_dict, inner_map=inner_map, pre_hook=pre_hook_method) def named_tuples(verb, typ, ctx): """ Handle a ``NamedTuple(name, [('field', type), ('field', type)])`` type. Also handles a ``collections.namedtuple`` if you have a fallback handler. """ if verb not in (JSON2PY, PY2JSON, INSP_PY, INSP_JSON) or not issub_safe(typ, tuple): return try: fields = typ._field_types except AttributeError: try: fields = typ._fields except AttributeError: return fields = [(name, None) for name in fields] else: fields = fields.items() if verb == INSP_PY: return partial(check_isinst, typ=typ) defaults = {} defaults.update(getattr(typ, "_fields_defaults", ())) defaults.update(getattr(typ, "_field_defaults", ())) inner_map = [] for name, inner in fields: tup = ( name, ctx.lookup(verb=verb, typ=resolve_fwd_ref(inner, typ), accept_missing=True), ) if verb == PY2JSON: tup += (defaults.get(name, SENTINEL),) elif verb == INSP_JSON: tup += (name not in defaults,) inner_map.append(tup) if verb == JSON2PY: return partial( convert_dict_to_attrs, pre_hook=identity, inner_map=tuple(inner_map), con=typ, ) elif verb == PY2JSON: return partial( convert_attrs_to_dict, post_hook=None, inner_map=tuple(inner_map) ) elif verb == INSP_JSON: return partial(check_dict, pre_hook=identity, inner_map=tuple(inner_map)) def tuples(verb, typ, ctx): """ Handle a ``Tuple[type, type, type]`` product type. Use a ``NamedTuple`` if you don't want a list. """ if verb not in (JSON2PY, PY2JSON, INSP_PY, INSP_JSON) or not has_origin(typ, tuple): return args = typ.__args__ if Ellipsis in args: # This is a homogeneous tuple, use the lists rule. return inner = [ctx.lookup(verb=verb, typ=arg) for arg in args] if verb == JSON2PY: return partial(convert_tuple_as_list, inner=inner, con=tuple) elif verb == PY2JSON: return partial(convert_tuple_as_list, inner=inner, con=list) elif verb == INSP_PY: return partial(check_tuple_as_list, inner=inner, con=tuple) elif verb == INSP_JSON: return partial(check_tuple_as_list, inner=inner, con=list) PK!K8= json_syntax/cache.pyfrom warnings import warn import threading class UnhashableType(UserWarning): pass class ForwardAction: """ A mutable callable. Since actions are simply functions, this lets us create a promise of a function and replace it when we have the actual function ready. This is a simple way to handle cycles in types. """ __slots__ = ("__call__",) def __init__(self, call): self.__call__ = call def __repr__(self): return "".format(self.__call__) class SimpleCache: def __init__(self): self.cache = {} def get(self, verb, typ): result = self._lookup(verb, typ) return result if result is not NotImplemented else None def _lookup(self, verb, typ): """ Handle unhashable types by warning about them. """ try: return self.cache.get((verb, typ)) except TypeError: warn( "Type {} is unhashable; json_syntax probably can't handle this".format( typ ), category=UnhashableType, ) return NotImplemented def in_flight(self, verb, typ): """ Called when we begin determining the action for a type. We construct a forward action that will be fulfilled by the ``complete`` call. """ if self._lookup(verb, typ) is None: def unfulfilled(value): # This can't be pickled, which is a good thing. raise TypeError( "Forward reference was never fulfilled to {} for {}".format( verb, typ ) ) forward = ForwardAction(unfulfilled) self.cache[verb, typ] = forward return forward def de_flight(self, verb, typ, forward): """ If a lookup fails, this removes the entry so that further attempts can be made. """ present = self._lookup(verb, typ) if present is forward: del self.cache[verb, typ] def complete(self, verb, typ, action): """ Once a type is complete, we fulfill any ForwardActions and replace the cache entry with the actual action. """ present = self._lookup(verb, typ) if present is NotImplemented: return # Unhashable. elif present is None: self.cache[verb, typ] = action elif isinstance(present, ForwardAction): present.__call__ = action # Replace the cache entry, if it's never been used let the ForwardAction be # garbage collected. self.cache[verb, typ] = action class ThreadLocalCache(SimpleCache): """ Avoids threads conflicting while looking up rules by keeping the cache in thread local storage. You can also prevent this by looking up rules during module loading. """ def __init__(self): self._local = threading.local() @property def cache(self): local = self._local try: return local.cache except AttributeError: _cache = local.cache = {} return _cache PK!#!yyjson_syntax/examples/README.md# The flags rule This rule lets you use enums as strings without losing all Enums as the `faux_enums` rule does. ## Demonstrates * How to write a rule * How to write an action * How to write a fake type that's compatible with `typing.Union` ## Caveats * Requires Python 3.7 * A user could mistakenly create a Flag instance * You'd probably be better off using enums PK!HRQ<< json_syntax/examples/__init__.py""" Examples of additional rules are in this directory. """ PK!U  json_syntax/examples/flags.py""" This module constructs its own fake type and a rule to support it. This lets you construct a quick set of enums that are represented as strings. """ from ..helpers import JSON2PY, PY2JSON, INSP_JSON, INSP_PY from functools import partial class Flag(type): """ An example of a custom type that lets you quickly create string-only flags. This also demonstrates a technique that makes it possible to create a fake type that can be used within ``typing.Union``. Thanks to __class_getitem__, you can invoke this as ``Flag['foo', 'bar', 'etc']`` but this requires Python 3.7! """ def __new__(cls, *args, **kwds): """This is necessary to be a subclass of `type`, which is necessary to be used in a Union.""" return super().__new__(cls, cls.__name__, (), {}) def __init__(self, *elems): """""" if not elems: raise TypeError("Flag must be called with at least one string argument.") if not all(isinstance(elem, str) for elem in elems): raise TypeError("Flag elements must all be strings.") self.elems = frozenset(elems) if len(self.elems) != len(elems): raise TypeError("Duplicate elements are prohibited.") def __class_getitem__(cls, elems): return cls(*elems) if isinstance(elems, tuple) else cls(elems) def __repr__(self): return f'{self.__class__.__name__}[{", ".join(map(repr, self.elems))}]' def _check_flag(elems, value): """ Checks that a value is a member of a set of flags. Note that we use a top-level function and `partial`. The trouble with lambdas or local defs is that they can't be pickled because they're inaccessible to the unpickler. If you don't intend to pickle your encoders, though, they're completely fine to use in rules. """ return isinstance(value, str) and value in elems def _convert_flag(elems, value): """ Checks the value is in elems and returns it. """ if value not in elems: raise ValueError(f'Expect {value!r} to be one of {", ".join(map(repr, elems))}') return value def flags(*, verb, typ, ctx): """ A simple rule to allow certain strings as flag values, but without converting them to an actual Enum. This rule is triggered with a fake type ``Flag['string', 'string', 'string']``. """ if not isinstance(typ, Flag): return if verb in (JSON2PY, PY2JSON): return partial(_convert_flag, typ.elems) elif verb in (INSP_JSON, INSP_PY): return partial(_check_flag, typ.elems) PK!mm#json_syntax/examples/loose_dates.pyfrom json_syntax.helpers import JSON2PY, PY2JSON, INSP_JSON, INSP_PY from json_syntax.action_v1 import check_parse_error, check_has_type from datetime import date, datetime from functools import partial """ This example is of working around common date issues. The standard rules use the standard library's fromisoformat and isoformat methods, to abide by the principle of least surprise. But it's pretty common to have to consume a datetime in a date field, and it may also be the case that you want to discard the timestamp. (Note: requires python3.7 or greater.) """ def convert_date_loosely(value): return datetime.fromisoformat(value).date() def iso_dates_loose(verb, typ, ctx): if typ == date: if verb == PY2JSON: return date.isoformat elif verb == JSON2PY: return convert_date_loosely elif verb == INSP_PY: return partial(check_has_type, typ=date) elif verb == INSP_JSON: return partial( check_parse_error, parser=convert_date_loosely, error=(TypeError, ValueError), ) PK!3aUUjson_syntax/helpers.pyfrom importlib import import_module import logging import typing as t import sys _eval_type = getattr(t, "_eval_type", None) logger = logging.getLogger(__name__) JSON2PY = "json_to_python" PY2JSON = "python_to_json" INSP_JSON = "inspect_json" INSP_PY = "inspect_python" NoneType = type(None) SENTINEL = object() python_minor = sys.version_info[:2] def identity(value): return value def has_origin(typ, origin, num_args=None): """ Determines if a concrete class (a generic class with arguments) matches an origin and has a specified number of arguments. The typing classes use dunder properties such that ``__origin__`` is the generic class and ``__args__`` are the type arguments. Note: in python3.6, the ``__origin__`` attribute changed to reflect native types. This call attempts to work around that so that python3.5 "just works." """ t_origin = get_origin(typ) if not isinstance(origin, tuple): origin = (origin,) return t_origin in origin and (num_args is None or len(typ.__args__) == num_args) def get_origin(typ): try: t_origin = typ.__origin__ except AttributeError: return None else: return _origin_pts(t_origin) try: _Generic = t.GenericMeta except AttributeError: _Generic = t._GenericAlias def is_generic(typ): """ Return true iff the instance (which should be a type value) is a generic type. `typing` module notes: 3.5: typing.List[int] is an instance of typing._GenericAlias 3.6, 3.7: typing.List[int] is an instance of typing.GenericMeta """ return isinstance(typ, _Generic) if python_minor < (3, 7): import collections as c _map = [ (t.Tuple, tuple), (t.List, list), (t.Dict, dict), (t.Callable, callable), (t.Type, type), (t.Set, set), (t.FrozenSet, frozenset), ] seen = {prov for prov, stable in _map} from collections import abc for name, generic in vars(t).items(): if not is_generic(generic) or generic in seen: continue for check in getattr(abc, name, None), getattr(c, name.lower(), None): if check: _map.append((generic, check)) continue _pts = {prov: stable for prov, stable in _map} _stp = {stable: prov for prov, stable in _map} def _origin_pts(origin, _pts=_pts): """ Convert the __origin__ of a generic type returned by the provisional typing API (python3.5) to the stable version. """ return _pts.get(origin, origin) def _origin_stp(origin, _stp=_stp): """ Convert the __origin__ of a generic type in the stable typing API (python3.6+) to the provisional version. """ return _stp.get(origin, origin) del _pts del _stp del _map del seen del abc del c else: _origin_pts = _origin_stp = identity def issub_safe(sub, sup): """ Safe version of issubclass. Tries to be consistent in handling generic types. `typing` module notes: 3.5, 3.6: issubclass(t.List[int], list) returns true 3.7: issubclass(t.List[int], list) raises a TypeError """ try: return not is_generic(sub) and issubclass(sub, sup) except TypeError: return False def resolve_fwd_ref(typ, context_class): """ Tries to resolve a forward reference given a containing class. This does nothing for regular types. """ resolved = None try: namespace = vars(import_module(context_class.__module__)) except AttributeError: logger.warning("Couldn't determine module of %r", context_class) else: resolved = _eval_type(typ, namespace, {}) if resolved is None: return typ else: return resolved if _eval_type is None: # If typing's internal API changes, we have tests that break. def resolve_fwd_ref(typ, context_class): # noqa return typ _missing_values = set() try: import attr _missing_values.add(attr.NOTHING) except ImportError: pass try: import dataclasses _missing_values.add(dataclasses.MISSING) except ImportError: pass def is_attrs_field_required(field): """ Determine if a field's default value is missing. """ if field.default not in _missing_values: return False try: factory = field.default_factory except AttributeError: return True else: return factory in _missing_values def _add_context(context, exc): try: if exc is None: return args = list(exc.args) arg_num, point = getattr(exc, "_context", (None, None)) if arg_num is None: for arg_num, val in enumerate(args): if isinstance(val, str): args[arg_num] = args[arg_num] + "; at " if val else "At " break else: # This 'else' clause runs if we don't `break` arg_num = len(args) args.append("At ") point = len(args[arg_num]) arg = args[arg_num] args[arg_num] = arg[:point] + str(context) + arg[point:] exc.args = tuple(args) exc._context = (arg_num, point) except Exception: # Swallow exceptions to avoid adding confusion. pass class ErrorContext: """ Inject contextual information into an exception message. This won't work for some exceptions like OSError that ignore changes to `args`; likely not an issue for this library. There is a neglible performance hit if there is no exception. >>> with ErrorContext('.foo'): ... with ErrorContext('[0]'): ... with ErrorContext('.qux'): ... 1 / 0 Traceback (most recent call last) ZeroDivisionError: division by zero; at .foo[0].qux The `__exit__` method will catch the exception and look for a `_context` attribute assigned to it. If none exists, it appends `; at ` and the context string to the first string argument. As the exception walks up the stack, outer ErrorContexts will be called. They will see the `_context` attribute and insert their context immediately after `; at ` and before the existing context. Thus, in the example above: ('division by zero',) -- the original message ('division by zero; at .qux',) -- the innermost context ('division by zero; at [0].qux',) ('division by zero; at .foo[0].qux',) -- the outermost context For simplicity, the method doesn't attempt to inject whitespace. To represent names, consider surrounding them with angle brackets, e.g. `` """ def __init__(self, context): self.context = context def __enter__(self): pass def __exit__(self, exc_type, exc_value, traceback): _add_context(self.context, exc_value) def err_ctx(context, func): """ Execute a callable, decorating exceptions raised with error context. ``err_ctx(context, func)`` has the same effect as: >>> with ErrorContext(context): ... return func() """ try: return func() except Exception as exc: _add_context(context, exc) raise PK!==json_syntax/ruleset.pyfrom .cache import SimpleCache import logging logger = logging.getLogger(__name__) TRACE = 5 def trace(fmt, *args, _logger=logger, _TRACE=TRACE): "Trace a log message. Avoids issues with applications setting `style`." if _logger.isEnabledFor(_TRACE): _logger.log(_TRACE, fmt.format(args)) def set_trace(enabled=True): logger.level = TRACE if enabled else logging.WARNING class RuleSet: def __init__(self, *rules, cache=None): self.rules = rules self.cache = cache or SimpleCache() def lookup(self, verb, typ, accept_missing=False): trace("lookup({!s}, {!r}): start", verb, typ) if typ is None: if not accept_missing: raise TypeError("Attempted to find {} for 'None'".format(verb)) return self.fallback(verb=verb, typ=typ) action = self.cache.get(verb=verb, typ=typ) if action is not None: trace("lookup({!s}, {!r}): cached", verb, typ) return action forward = self.cache.in_flight(verb=verb, typ=typ) try: for rule in self.rules: action = rule(verb=verb, typ=typ, ctx=self) if action is not None: self.cache.complete(verb=verb, typ=typ, action=action) trace("lookup({!s}, {!r}): computed", verb, typ) return action trace("lookup({!s}, {!r}): fallback", verb, typ) action = self.fallback(verb=verb, typ=typ) if action is not None: self.cache.complete(verb=verb, typ=typ, action=action) trace("lookup({!s}, {!r}): computed by fallback", verb, typ) return action finally: self.cache.de_flight(verb=verb, typ=typ, forward=forward) def fallback(self, verb, typ): pass PK!3"%%json_syntax/std.pyfrom .helpers import ( has_origin, get_origin, issub_safe, NoneType, JSON2PY, PY2JSON, INSP_JSON, INSP_PY, ) from .action_v1 import ( check_collection, check_float, check_isinst, check_has_type, check_mapping, check_optional, check_parse_error, check_str_enum, convert_collection, convert_date, convert_datetime, convert_enum_str, convert_float, convert_mapping, convert_none, convert_optional, convert_str_enum, convert_time, ) from collections import OrderedDict from datetime import datetime, date, time from decimal import Decimal from enum import Enum from functools import partial from typing import Union """ These are standard rules to handle various types. All rules take a verb, a Python type and a context, which is generally a RuleSet. A rule returns a conversion function for that verb. """ def atoms(verb, typ, ctx): "Rule to handle atoms on both sides." if issub_safe(typ, (str, int, NoneType)): if verb in (JSON2PY, PY2JSON): if typ is NoneType: return convert_none for base in (str, bool, int): if issubclass(typ, base): return base elif verb == INSP_PY: for base in (NoneType, str, bool, int): if issubclass(typ, base): return partial(check_isinst, typ=base) elif verb == INSP_JSON: for base in (NoneType, str, bool, int): if issubclass(typ, base): return partial(check_isinst, typ=base) def floats(verb, typ, ctx): """ Rule to handle floats passing NaNs through unaltered. JSON technically recognizes integers and floats. Many JSON generators will represent floats with integral value as integers. Thus, this rule will convert both integers and floats in JSON to floats in Python. Python's standard JSON libraries treat `nan` and `inf` as special constants, but this is not standard JSON. This rule simply treats them as regular float values. If you want to catch them, you can set ``allow_nan=False`` in ``json.dump()``. """ if issub_safe(typ, float): if verb in (JSON2PY, PY2JSON): return float elif verb == INSP_PY: return partial(check_isinst, typ=float) elif verb == INSP_JSON: return partial(check_isinst, typ=(int, float)) def floats_nan_str(verb, typ, ctx): """ Rule to handle floats passing NaNs through as strings. Python's standard JSON libraries treat `nan` and `inf` as special constants, but this is not standard JSON. This rule converts special constants to string names. """ if issub_safe(typ, float): if verb == JSON2PY: return float elif verb == PY2JSON: return convert_float elif verb == INSP_PY: return partial(check_isinst, typ=float) elif verb == INSP_JSON: return check_float def decimals(verb, typ, ctx): """ Rule to handle decimals natively. This rule requires that your JSON library has decimal support, e.g. simplejson. Other JSON processors may convert values to and from floating-point; if that's a concern, consider `decimals_as_str`. This rule will fail if passed a special constant. """ if issub_safe(typ, Decimal): if verb in (JSON2PY, PY2JSON): return Decimal elif verb in (INSP_JSON, INSP_PY): return partial(check_isinst, typ=Decimal) def decimals_as_str(verb, typ, ctx): """ Rule to handle decimals as strings. This rule bypasses JSON library decimal support, e.g. simplejson. This rule will fail if passed a special constant. """ if issub_safe(typ, Decimal): if verb == JSON2PY: return Decimal elif verb == PY2JSON: return str elif verb == INSP_PY: return partial(check_isinst, typ=Decimal) elif verb == INSP_JSON: return partial(check_parse_error, parser=Decimal, error=ArithmeticError) def iso_dates(verb, typ, ctx): """ Rule to handle iso formatted datetimes and dates. This is the strict variant that simply uses the `fromisoformat` and `isoformat` methods of `date` and `datetime`. There is a loose variant in the examples that will accept a datetime in a date. A datetime always accepts both dates and datetimes. """ if typ not in (date, datetime, time): return if verb == PY2JSON: return typ.isoformat elif verb == INSP_PY: return partial(check_has_type, typ=typ) elif verb in (JSON2PY, INSP_JSON): if typ == date: parse = convert_date elif typ == datetime: parse = convert_datetime elif typ == time: parse = convert_time else: return if verb == JSON2PY: return parse else: return partial( check_parse_error, parser=parse, error=(TypeError, ValueError) ) def enums(verb, typ, ctx): "Rule to convert between enumerated types and strings." if issub_safe(typ, Enum): if verb == PY2JSON: return partial(convert_enum_str, typ=typ) elif verb == JSON2PY: return partial(convert_str_enum, mapping=dict(typ.__members__)) elif verb == INSP_PY: return partial(check_isinst, typ=typ) elif verb == INSP_JSON: return partial(check_str_enum, mapping=frozenset(typ.__members__.keys())) def faux_enums(verb, typ, ctx): "Rule to fake an Enum by actually using strings." if issub_safe(typ, Enum): if verb in (JSON2PY, PY2JSON): mapping = {name: name for name in typ.__members__} return partial(convert_str_enum, mapping=mapping) elif verb in (INSP_JSON, INSP_PY): return partial(check_str_enum, mapping=frozenset(typ.__members__.keys())) def optional(verb, typ, ctx): """ Handle an ``Optional[inner]`` by passing ``None`` through. """ if verb not in (JSON2PY, PY2JSON, INSP_PY, INSP_JSON): return if has_origin(typ, Union, num_args=2): if NoneType not in typ.__args__: return inner = None for arg in typ.__args__: if arg is not NoneType: inner = arg if inner is None: raise TypeError("Could not find inner type for Optional: " + str(typ)) else: return inner = ctx.lookup(verb=verb, typ=inner) if verb in (JSON2PY, PY2JSON): return partial(convert_optional, inner=inner) elif verb in (INSP_JSON, INSP_PY): return partial(check_optional, inner=inner) def lists(verb, typ, ctx): """ Handle a ``List[type]`` or ``Tuple[type, ...]``. Trivia: the ellipsis indicates a homogenous tuple; ``Tuple[A, B, C]`` is a product type that contains exactly those elements. """ if verb not in (JSON2PY, PY2JSON, INSP_PY, INSP_JSON): return if has_origin(typ, list, num_args=1): (inner,) = typ.__args__ elif has_origin(typ, tuple, num_args=2): (inner, ell) = typ.__args__ if ell is not Ellipsis: return else: return inner = ctx.lookup(verb=verb, typ=inner) con = list if verb in (PY2JSON, INSP_JSON) else get_origin(typ) if verb in (JSON2PY, PY2JSON): return partial(convert_collection, inner=inner, con=con) elif verb in (INSP_JSON, INSP_PY): return partial(check_collection, inner=inner, con=con) def sets(verb, typ, ctx): """ Handle a ``Set[type]`` or ``FrozenSet[type]``. """ if verb not in (JSON2PY, PY2JSON, INSP_PY, INSP_JSON): return if not has_origin(typ, (set, frozenset), num_args=1): return (inner,) = typ.__args__ con = list if verb in (PY2JSON, INSP_JSON) else get_origin(typ) inner = ctx.lookup(verb=verb, typ=inner) if verb in (JSON2PY, PY2JSON): return partial(convert_collection, inner=inner, con=con) elif verb in (INSP_JSON, INSP_PY): return partial(check_collection, inner=inner, con=con) def _stringly(verb, typ, ctx): """ Rule to handle types that reliably convert directly to strings. This is used internally by dicts. """ if verb not in (JSON2PY, PY2JSON, INSP_PY, INSP_JSON) or not issub_safe( typ, (int, str, date, Enum) ): return for base in str, int: if issubclass(typ, base): if verb in (JSON2PY, PY2JSON): return base elif verb in (INSP_JSON, INSP_PY): return partial(check_isinst, typ=base) for rule in enums, iso_dates: action = rule(verb=verb, typ=typ, ctx=ctx) if action is not None: return action def dicts(verb, typ, ctx): """ Handle a ``Dict[key, value]`` where key is a string, integer or enum type. """ if verb not in (JSON2PY, PY2JSON, INSP_PY, INSP_JSON): return if not has_origin(typ, (dict, OrderedDict), num_args=2): return (key_type, val_type) = typ.__args__ key_type = _stringly(verb=verb, typ=key_type, ctx=ctx) if key_type is None: return val_type = ctx.lookup(verb=verb, typ=val_type) if verb in (JSON2PY, PY2JSON): return partial(convert_mapping, key=key_type, val=val_type, con=get_origin(typ)) elif verb in (INSP_JSON, INSP_PY): return partial(check_mapping, key=key_type, val=val_type, con=get_origin(typ)) PK!json_syntax/unions.pyfrom .helpers import has_origin, JSON2PY, PY2JSON, INSP_JSON, INSP_PY from .action_v1 import convert_union, check_union from functools import partial from typing import Union def unions(verb, typ, ctx): """ Handle undiscriminated unions of the form ``Union[A, B, C, D]`` by inspecting the inner types one by one. This is the "implicit discriminant" technique, exploiting the fact that Python already tags all values with their type. A potential problem is that the JSON structure may not retain that information. So another rule could attempt to add a discriminant to the JSON data. For example, if you had two ``attrs`` style classes, they could add a `type` field with the class name. As there are many ways to do that, this rule doesn't attempt to pick one for you. Note: The optional rule handles the common case of ``Union[T, NoneType]`` more efficiently, so it should be before this. """ if has_origin(typ, Union): if verb in (JSON2PY, PY2JSON): if verb == PY2JSON: check_verb = INSP_PY elif verb == JSON2PY: check_verb = INSP_JSON else: return steps = [] for arg in typ.__args__: check = ctx.lookup(verb=check_verb, typ=arg) convert = ctx.lookup(verb=verb, typ=arg) steps.append((check, convert, "<{!s}>".format(arg))) return partial(convert_union, steps=steps, typename=repr(typ)) elif verb in (INSP_JSON, INSP_PY): steps = [] for arg in typ.__args__: check = ctx.lookup(verb=verb, typ=arg) steps.append((check, "<{!s}>".format(arg))) return partial(check_union, steps=steps) PK!++#json_syntax-0.2.0.dist-info/LICENSEMIT License Copyright (c) 2019 Ben Samuel Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTU!json_syntax-0.2.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hi8$json_syntax-0.2.0.dist-info/METADATA[rƒX,IelWd[nl,')CA0h&7]IO >dn*EcKSTz\f:OWzkfg~ƧnuT6NbY#NV-mqgZ}7P\;gο]~f[En]boLKi7u]:7"^EV/w^zy6:ohjB]mcrij:gj p?zc~mʸ;SOşO?>*34 sgr[nMQZ׍S:SSu=xMT7if~RY yf|:9||LW݆t]W{!(a1Iy4]&jgFa%gˊg;/kKOMѼrܘ-F6)C-#w@n)sVR[KS2섄e5ۢBX͓u㳽9o>ZD~IyIk2\ߐ H]ʼ+sфisޤg]Zl vM׺y@Bs0 ,hD[\ВEJVkH=#Ф<_-\YVLZ)Rv7Rg~:[VoUf]L avynkm|G,SL+o닉 .SLoUa!R`NH,Uq3xBVPhb֫ %.XC2{hF4OԏVpd(ᴉI7zܷfOA;h7rM1`%G3f+6&0? =y?rTT8: T!N1PyyS/ {A@Vu?=!q D=7}[@_$W'+t#&#yR{!7!"()Rd'U JAQ$݈"Ld9[SDƷUmRR5iVmI0ѱ7;pE_S g$tg[0 i7Z -n 7D! X yйK z>#?5>l"oZ/cț5SvPV_F\e[DKiQ3%y5;A;p7 * -|N]Ι,:ZRD2M%܂6L%E(bǙ .R˜\2ӎe}j$a5YqFsc鮭gna 0Wa?}Tms[OF2nN򛢯!$3ʉ5vʼn",|X9\i$Q i F=Ǟ=6 ]R3ࠁ oRᗣp>aLi5,P4X8X1an>R*Mm2ԁrX(#bא!I>B/@j!O0v|)xcwLmcue%vq1P(J*CՔQdCPW)7 Ԛp4OC+ckqhzOТdr?{;ǯLkEx&l|Pa9 ^͵(]A|s՜713-bDi`{? TϓGgg C&;#G Gd &H9#, }7Bc,$@m^Yr]IR HWH8H|qykPc' P'!rgH|x2L$JdT?7$Fd]'/rM`ɃacunVeWC>=Omt_1(}fk^Y!J.֬Œ>K빀JyXd) [^PtpXf)Q6ԲEJ}kNHhf̧]'XIGS^NS }%Rd\56;]1 <9{⸇[2\{]^I e2ɗC)8`^In_"_B:jM9kԁl!3zGpj'llF,` NX })Ч 5da5d\5$ȴiʬ~gRf!P*!Ju 6lBiM? X o@%ׯ CLd2Zd[h\#n#$4D$L"[74x xCL+VWHEIAFW2hr"ɝ/H~ZpH鄎{zZ X\8f؞I #H aH@:ZI0r,ioFW دi %6yF"{NSixnУaty|RsNʹ 2"6)VLF:O`'qݿn =*Җc{^ȷ܏ޓԺzI|:Y&MO /stx:?^k@'/CGS<&bMΠJJ=lGGa2,C"~p<;E-AmokSHk'm[KhVpHGuW=dV~hGB%NDUXpmd2 d ,I1WxV)Es|4}IUY#S(1 6+-i'5@|M$unmCݨP2fZc5,y'}f*9ѐF=cMN.<C_?$tћVpB yB9e>x||O\?5 ?kǚQLQT5LLk(mNIBQKACqRX ^>&)j!]pK4bwq18)QƃOjMi e2*aG?w)1%.PEDY"nU&NN%u-;Q-..d.GBɩ* 5j3sZC5_5 _O@e.8cRF6pdq4gq~8砡pW^˘[\u, `PhN!@+]o=ЙHЧQ ڞT>8h*~/_N_2)hR ;? ҟKdT(GdH:VVY^&pI*f@IRN 3Jg.XI N{~/"pCPhϐcY Mziw[EhO(}ے*#Cj<'W&thڕ{ן za 2_ڣ@JoZ8ąL> uy]0)M8Zid"*s^|NL_u2wINRDhI6d @xJU0Mٟ .&@WBcoWX{k\Y $%cOvފlm"$b 藶9I55&p 8v-+<\YR(#pk69s3ɽ.zr*a\{yuGK-)`?ZRSFC75g8-`9ڵ[4̪SI'߄DM9S"vRpxr_%b5 'o$P92q=B%qpLʽ)&ҦrNqiJ_MjG(ѰDI;v\WotG0\tXW_ֳcN'zG>4i]i@M#HrLeZ8KC61)q4?#w/o._ x//PK!Hkȶ"json_syntax-0.2.0.dist-info/RECORD}ӹZ< SpdPTʔPIFykԮtWW]]RYkK^M}aͶʸ*,ᕀ&Ae)b? >{w#B4dSpٓxcS<џ`߿ ZyWEl`rsB!sYF$*Ww h@Q 7XOzZb4]UU DŽʄm:YKhIt $Ϫpʦݗ!*7+K%r 8M9˿J]&jzZv:Ɖ~!$dNf$jul2ձ~_V;σ!nq &34WNO I?huf&Ԓ ?E]w=%_P))k 3s4[1 x"8N~7E?_0,֗ѡ_.`:3F>{<CPkx Lf;-'OK- G+{uc;s5 ξ\ɲ"Lӱ1WةK0ć1={*S준=({|F\sGpc{d]fU\]Aҷ q*Pm8 BÈv@)uCXMOύ̯}˷