PK!jsonmask/__init__.pyfrom .mask import apply_json_mask, should_include_variable from .parsing import parse_fields __project__ = 'jsonmask' __version__ = '0.1.0' VERSION = "{0} v{1}".format(__project__, __version__) PK!;뗾jsonmask/mask.pyimport sys from .parsing import parse_fields if sys.version_info[0] == 2: # pylint: disable=E0602 string_types = (basestring,) else: string_types = (str,) def apply_json_mask(data, json_mask, is_negated=False, depth=1, max_depth=None): """Take a data structure and compiled JSON mask, and remove unwanted data. :data: The Python dictionary you want to prune :json_mask: The compiled jsonmask indicating which data to keep and which to discard :is_negated: If True, membership in the json_mask indicates removal instead of inclusion :depth: Recursion flag to maintain progress toward `max_depth` :max_depth: Integer that, if supplied, sets a maximum depth on the supplied `json_mask` :Returns: dict """ if max_depth and depth >= max_depth: raise ValueError('Too much nested data!') if isinstance(json_mask, string_types): json_mask = parse_fields(json_mask) allowed_data = {} for key, subdata in data.items(): if should_include_variable(key, json_mask, is_negated=is_negated): # Terminal data if not isinstance(subdata, dict): allowed_data[key] = subdata continue next_json_mask = json_mask.get(key, {}) # Dead ends in the mask indicate that want # everything nested below this if not next_json_mask: allowed_data[key] = subdata continue allowed_data.setdefault(key, {}) allowed_data[key].update( apply_json_mask( subdata, next_json_mask, is_negated=is_negated, depth=depth + 1, max_depth=max_depth, ), ) return allowed_data def is_structure_wildcard(structure): return len(structure) == 1 and list(structure.keys())[0] == '*' def should_include_variable(path, structure, is_negated=False): """Determine inclusion of variable at path given parsed jsonmask. :path: Something like "teacher.classes.students" :structure: Nested structure whose keys may correlate with the dotted path's tokens :is_negated: If True, represents `?excludes` instead of `?fields` :returns: bool """ if not structure: return True return ( do_fields_allow(path, structure) if not is_negated else not do_excludes_forbid(path, structure) ) def do_fields_allow(path, structure): path = path.split('.') struct = structure.copy() is_allowed = True for key in path: if not is_allowed: break is_wildcard = is_structure_wildcard(struct) if not is_wildcard and struct and key not in struct: is_allowed = False else: if is_wildcard: struct = struct['*'] else: struct = struct.get(key, {}) return is_allowed def do_excludes_forbid(path, structure): path = path.split('.') struct = structure.copy() is_forbidden = True for index, key in enumerate(path, start=1): if not is_forbidden: break is_wildcard = is_structure_wildcard(struct) if not is_wildcard and struct and key not in struct: is_forbidden = False else: if is_wildcard: struct = struct['*'] else: struct = struct.get(key, {}) # Only on the last pass of the loop, take a gander and what # came next. If we're defining excluded sub-fields of this attribute # then we obviously want some other fields of this attribute, and # thus we want this attribute if index == len(path): if struct: return bool(is_structure_wildcard(struct)) return is_forbidden PK!tggjsonmask/parsing.pyfrom __future__ import unicode_literals TERMINALS = ['(', ')', ',', '/'] def maybe_add_word(name, tokens): if name: tokens.append(name) name = '' return name, tokens def tokenize_partial_response(text): tokens = [] name = '' if not text: return tokens for ch in text: if ch in TERMINALS: name, tokens = maybe_add_word(name, tokens) tokens.append(ch) else: name += ch name, tokens = maybe_add_word(name, tokens) return tokens def parse_partial_response(tokens): return _parse_partial_response(tokens, {}, []) def _parse_partial_response(tokens, parent, stack): parent = parent.copy() props = {} while True: if not tokens: return parent token = tokens.pop(0) if token not in TERMINALS: stack.append(token) resp = _parse_partial_response( tokens, props.get(token, {}), stack, ) props[token] = resp stack.pop() elif token == ',': return props elif token == '/': stack.append(token) continue elif token == '(': stack.append(token) continue elif token == ')': return props parent.update(props) if stack and stack[-1] in ['/']: stack.pop() return parent return parent def parse_fields(text): """Turn a string jsonmask into a Python dictionary representing the desired data pruning. You will likely want to call ``jsonmask.mask.apply_json_mask``. :text: Plain text value representing desired structure, e.g., `a,b/c` :returns: dict """ if not text: return None return parse_partial_response( tokens=tokenize_partial_response(text), ) PK!ޙPb""jsonmask/tests/__init__.py"""Unit tests for the package.""" PK!ov,WWjsonmask/tests/conftest.py"""Unit tests configuration file.""" import log def pytest_configure(config): """Disable verbose output when running tests.""" log.init(debug=True) terminal = config.pluginmanager.getplugin('terminal') base = terminal.TerminalReporter class QuietReporter(base): """Reporter that only shows dots when running tests.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.verbosity = 0 self.showlongtestinfo = False self.showfspath = False terminal.TerminalReporter = QuietReporter PK!͞Cjsonmask/tests/test_mask.py"""Sample unit test module using pytest-describe and expecter.""" # pylint: disable=redefined-outer-name,unused-variable,expression-not-assigned,singleton-comparison import logging from jsonmask import mask def test_apply_json_mask(): print('omg') ORIGINAL = 'original' EMPTY = 'empty' tests = [ # (data, mask, result, result_when_mask_negated,) # 0 ({'a': 1}, 'a', ORIGINAL, EMPTY,), ({'a': 1}, 'b', EMPTY, ORIGINAL,), ({'a': 1}, 'a/b', ORIGINAL, ORIGINAL,), # Drilling into terminal-values is a no-op ({'a': {'b': 1}}, 'a', ORIGINAL, EMPTY,), ({'a': {'b': 1}}, 'b', EMPTY, ORIGINAL,), # 5 ({'a': {'b': 1}}, 'a/b', ORIGINAL, {'a': {}},), ({'a': {'b': 1}}, 'a/c', {'a': {}}, ORIGINAL,), ({'a': {'b': 1}, 'b': {'asdf': 2}}, 'a,b/c', {'a': {'b': 1}, 'b': {}}, {'b': {'asdf': 2}},), ({'a': {'b': 1}, 'b': {'asdf': 2}}, 'a,b/asdf', ORIGINAL, {'b': {}},), ({'a': {'b': 1}, 'b': {'asdf': 2}}, 'a,c/c', {'a': {'b': 1}}, {'b': {'asdf': 2}},), ] for index, (data, _mask, expected_result, expected_result_when_negated,) in enumerate(tests, start=1): if expected_result == ORIGINAL: expected_result = data.copy() if expected_result == EMPTY: expected_result = {} if expected_result_when_negated == ORIGINAL: expected_result_when_negated = data.copy() if expected_result_when_negated == EMPTY: expected_result_when_negated = {} try: pruned_data = mask.apply_json_mask(data, _mask) assert pruned_data == expected_result except Exception as e: logging.exception('Encountered %s on include test %s', type(e).__name__, index) raise e try: pruned_data = mask.apply_json_mask(data, _mask, is_negated=True) assert pruned_data == expected_result_when_negated except Exception as e: logging.exception('Encountered %s on exclude test %s', type(e).__name__, index) raise e def test_inclusion_resolver(): tests = [ # ( # path, includes/exclude, # expected_include_result, expected_exclude_result, # ) # 0 ('a', {}, True, True,), ('a.b', {}, True, True,), ('a', {'a': {}}, True, False,), ('a.b', {'a': {}}, True, False,), ('a.b', {'a': {'c': {}}}, False, True,), # 5 ('a', {'a': {'b': {}}}, True, True,), ('a.b', {'a': {'b': {}}}, True, False,), ('a', {'a': {'*': {}}}, True, False,), ('a.b', {'a': {'*': {}}}, True, False,), ('a.b.c', {'a': {'*': {}}}, True, False,), # 10 ('a.b.c', {'a': {'*': {'d': {}}}}, False, True,), ('a.b.c', {'b': {}}, False, True,), ('a.b.c', {'a': {'b': {'d': {}}}}, False, True,), ('a.b.d', {'a': {'b': {'d': {}}}}, True, False,), ] for index, (path, structure, result_as_include, result_as_exclude,) in enumerate(tests, start=1): wrong_include_value = 'False' if result_as_include else 'True' error_msg = 'Incorrectly returned {} on include test for path `{}` on test {}'.format( wrong_include_value, path, index, ) assert mask.should_include_variable(path, structure) == result_as_include, error_msg wrong_exclude_value = 'False' if result_as_exclude else 'True' error_msg = 'Incorrectly returned {} on exclude test for path `{}` on test {}'.format( wrong_exclude_value, path, index, ) assert mask.should_include_variable(path, structure, is_negated=True) == result_as_exclude, error_msg PK!ojsonmask/tests/test_parsing.py"""Sample unit test module using pytest-describe and expecter.""" # pylint: disable=redefined-outer-name,unused-variable,expression-not-assigned,singleton-comparison import logging from expecter import expect from jsonmask import parsing def test_multiple_builds(): tests = [ { 'fields': 'a,b', 'mask': {'a': {}, 'b': {}}, }, { 'fields': 'a/b', 'mask': {'a': {'b': {}}}, }, { 'fields': 'a/b,c', 'mask': {'a': {'b': {}}, 'c': {}}, }, { 'fields': 'a/b,a/c', 'mask': {'a': {'b': {}, 'c': {}}}, }, { 'fields': 'a/b,a/c,c', 'mask': {'a': {'b': {}, 'c': {}}, 'c': {}}, }, { 'fields': 'a/b/z,a/c,c', 'mask': {'a': {'b': {'z': {}}, 'c': {}}, 'c': {}}, }, { 'fields': 'a(b)', 'mask': {'a': {'b': {}}}, }, { 'fields': 'a(b,c)', 'mask': {'a': {'b': {}, 'c': {}}}, }, { 'fields': 'a(b,c/d)', 'mask': {'a': {'b': {}, 'c': {'d': {}}}}, }, { 'fields': 'z,a(b,c/d)', 'mask': {'a': {'b': {}, 'c': {'d': {}}}, 'z': {}}, }, { 'fields': 'a(b,c/d),z', 'mask': {'a': {'b': {}, 'c': {'d': {}}}, 'z': {}}, }, { 'fields': 'a(b,c/d(x,y,z))', 'mask': {'a': {'b': {}, 'c': {'d': {'x': {}, 'y': {}, 'z': {}}}}}, }, { 'fields': 'a(b,c/d(x,y,z),abc(xyz/z(a)))', 'mask': {'a': {'b': {}, 'c': {'d': {'x': {}, 'y': {}, 'z': {}}, 'abc': {'xyz': {'z': {'a': {}}}}}}}, }, { 'fields': 'a(b,c/d(x,y,z)),abc(xyz/z(a))', 'mask': {'a': {'b': {}, 'c': {'d': {'x': {}, 'y': {}, 'z': {}}}}, 'abc': {'xyz': {'z': {'a': {}}}}}, }, ] for test in tests: error_msg = 'Failed to compile fields {}'.format(test['fields']) try: expect(test['mask']) == parsing.parse_fields(test['fields']) except Exception as e: logging.exception(e) raise e PK!H9VWXjsonmask-0.1.1.dist-info/WHEEL A н#f."jm)!fb҅~ܴA,mTD}E n0H饹*|D[¬c i=0(q3PK!H[jO !jsonmask-0.1.1.dist-info/METADATAVmO#7_hvH+AK]\ UN6>k{C*{ǻ7ڐ<33k,gſuB&#+!V%sdi{*KfK#3.$^0IVk%j?ӊK +eqYϪIu~aFMWuD~Qܭ¢Wc;9f2Q+6;dBfS"OE8 2Ɖ[==9L{Q>obqX|X|sɜS+0MWf=1HPK?`nn`A琇=dapA-Vኩ2*p ܍<y([W9/7 28nu{hX7{D}O+ZͫpXT-;H=Iqpxyvq}f!׶&"ǭ0AA|G96~<>QrJȼk?wEY$n&@.:ͅJy+:qbobc.m.{)ϱym'Kx/2AvUZSΗX̴" dޠGBF+sq4o K)\k.\SR4N-Ӎx1hJ8(_^kw+n+ jߡݸkD|!6rݴ9f>"15d&qhKs$^tju8]Yւ`BK`w ]xCm|eS/Bv'Ikare˙4zSlig]1x6x4HVU.x`cVkxMOVa/M'F7|/$gz%?4([6G pB`d+-AOe4{qxCeC#G!F*fTVB]T(joKs6:|gg]d_Ht$S PK!H> jsonmask-0.1.1.dist-info/RECORD}n@}ŴȂ6s B1\؀R` ˖;z[jh:oq4B fwIZaݑC.}rbDI_|g+6%/ͺƃk˙1%XK% f_` Pwj+לʈXǏęFYma!hZ^S %8Tn܍3MrvOXYùo7WTk"m*4&+/@[.珥[5((>׌&/4798nI197y6 s?)F=#.4 e|R\̹*̪-R4M$Wԟ0)ڷ@WxcڬqlN5==1-b* SA7j)$Kts8Z{.Qg*0h06h54:X dWY2APK!jsonmask/__init__.pyPK!;뗾jsonmask/mask.pyPK!tggjsonmask/parsing.pyPK!ޙPb""jsonmask/tests/__init__.pyPK!ov,WWjsonmask/tests/conftest.pyPK!͞Cojsonmask/tests/test_mask.pyPK!oa*jsonmask/tests/test_parsing.pyPK!H9VWXN3jsonmask-0.1.1.dist-info/WHEELPK!H[jO !3jsonmask-0.1.1.dist-info/METADATAPK!H> 7jsonmask-0.1.1.dist-info/RECORDPK :