PK!jsonmask/__init__.pyfrom .mask import apply_json_mask, should_include_variable from .parsing import parse_fields __project__ = 'jsonmask' __version__ = '0.1.0' VERSION = "{0} v{1}".format(__project__, __version__) PK!$s jsonmask/mask.pyimport sys from .parsing import parse_fields if sys.version_info[0] == 2: # pylint: disable=E0602 string_types = (basestring,) else: string_types = (str,) def apply_json_mask(data, json_mask, is_negated=False, depth=1, max_depth=None): if max_depth and depth >= max_depth: raise ValueError('Too much nested data!') if isinstance(json_mask, string_types): json_mask = parse_fields(json_mask) allowed_data = {} for key, subdata in data.items(): if should_include_variable(key, json_mask, is_negated=is_negated): # Terminal data if not isinstance(subdata, dict): allowed_data[key] = subdata continue next_json_mask = json_mask.get(key, {}) # Dead ends in the mask indicate that want # everything nested below this if not next_json_mask: allowed_data[key] = subdata continue allowed_data.setdefault(key, {}) allowed_data[key].update( apply_json_mask( subdata, next_json_mask, is_negated=is_negated, depth=depth + 1, max_depth=max_depth, ), ) return allowed_data def is_structure_wildcard(structure): return len(structure) == 1 and list(structure.keys())[0] == '*' def should_include_variable(path, structure, is_negated=False): """Determine inclusion of variable at path given parsed jsonmask. :path: str Something like "services.categories" :structure: dict Nested structure whose keys may correlate with the dotted path's tokens :is_negated: dict If True, represents `?excludes` instead of `?fields` :returns: bool """ if not structure: return True return ( do_fields_allow(path, structure) if not is_negated else not do_excludes_forbid(path, structure) ) def do_fields_allow(path, structure): path = path.split('.') struct = structure.copy() is_allowed = True for key in path: if not is_allowed: break is_wildcard = is_structure_wildcard(struct) if not is_wildcard and struct and key not in struct: is_allowed = False else: if is_wildcard: struct = struct['*'] else: struct = struct.get(key, {}) return is_allowed def do_excludes_forbid(path, structure): path = path.split('.') struct = structure.copy() is_forbidden = True for index, key in enumerate(path, start=1): if not is_forbidden: break is_wildcard = is_structure_wildcard(struct) if not is_wildcard and struct and key not in struct: is_forbidden = False else: if is_wildcard: struct = struct['*'] else: struct = struct.get(key, {}) # Only on the last pass of the loop, take a gander and what # came next. If we're defining excluded sub-fields of this attribute # then we obviously want some other fields of this attribute, and # thus we want this attribute if index == len(path): if struct: return bool(is_structure_wildcard(struct)) return is_forbidden PK!(RVVjsonmask/parsing.pyfrom __future__ import unicode_literals TERMINALS = ['(', ')', ',', '/'] def maybe_add_word(name, tokens): if name: tokens.append(name) name = '' return name, tokens def tokenize_partial_response(text): tokens = [] name = '' if not text: return tokens for ch in text: if ch in TERMINALS: name, tokens = maybe_add_word(name, tokens) tokens.append(ch) else: name += ch name, tokens = maybe_add_word(name, tokens) return tokens def parse_partial_response(tokens): return _parse_partial_response(tokens, {}, []) def _parse_partial_response(tokens, parent, stack): parent = parent.copy() props = {} while True: if not tokens: return parent token = tokens.pop(0) if token not in TERMINALS: stack.append(token) resp = _parse_partial_response( tokens, props.get(token, {}), stack, ) props[token] = resp stack.pop() elif token == ',': return props elif token == '/': stack.append(token) continue elif token == '(': stack.append(token) continue elif token == ')': return props parent.update(props) if stack and stack[-1] in ['/']: stack.pop() return parent return parent def parse_fields(text): if not text: return None return parse_partial_response( tokens=tokenize_partial_response(text), ) PK!ޙPb""jsonmask/tests/__init__.py"""Unit tests for the package.""" PK!ov,WWjsonmask/tests/conftest.py"""Unit tests configuration file.""" import log def pytest_configure(config): """Disable verbose output when running tests.""" log.init(debug=True) terminal = config.pluginmanager.getplugin('terminal') base = terminal.TerminalReporter class QuietReporter(base): """Reporter that only shows dots when running tests.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.verbosity = 0 self.showlongtestinfo = False self.showfspath = False terminal.TerminalReporter = QuietReporter PK!͞Cjsonmask/tests/test_mask.py"""Sample unit test module using pytest-describe and expecter.""" # pylint: disable=redefined-outer-name,unused-variable,expression-not-assigned,singleton-comparison import logging from jsonmask import mask def test_apply_json_mask(): print('omg') ORIGINAL = 'original' EMPTY = 'empty' tests = [ # (data, mask, result, result_when_mask_negated,) # 0 ({'a': 1}, 'a', ORIGINAL, EMPTY,), ({'a': 1}, 'b', EMPTY, ORIGINAL,), ({'a': 1}, 'a/b', ORIGINAL, ORIGINAL,), # Drilling into terminal-values is a no-op ({'a': {'b': 1}}, 'a', ORIGINAL, EMPTY,), ({'a': {'b': 1}}, 'b', EMPTY, ORIGINAL,), # 5 ({'a': {'b': 1}}, 'a/b', ORIGINAL, {'a': {}},), ({'a': {'b': 1}}, 'a/c', {'a': {}}, ORIGINAL,), ({'a': {'b': 1}, 'b': {'asdf': 2}}, 'a,b/c', {'a': {'b': 1}, 'b': {}}, {'b': {'asdf': 2}},), ({'a': {'b': 1}, 'b': {'asdf': 2}}, 'a,b/asdf', ORIGINAL, {'b': {}},), ({'a': {'b': 1}, 'b': {'asdf': 2}}, 'a,c/c', {'a': {'b': 1}}, {'b': {'asdf': 2}},), ] for index, (data, _mask, expected_result, expected_result_when_negated,) in enumerate(tests, start=1): if expected_result == ORIGINAL: expected_result = data.copy() if expected_result == EMPTY: expected_result = {} if expected_result_when_negated == ORIGINAL: expected_result_when_negated = data.copy() if expected_result_when_negated == EMPTY: expected_result_when_negated = {} try: pruned_data = mask.apply_json_mask(data, _mask) assert pruned_data == expected_result except Exception as e: logging.exception('Encountered %s on include test %s', type(e).__name__, index) raise e try: pruned_data = mask.apply_json_mask(data, _mask, is_negated=True) assert pruned_data == expected_result_when_negated except Exception as e: logging.exception('Encountered %s on exclude test %s', type(e).__name__, index) raise e def test_inclusion_resolver(): tests = [ # ( # path, includes/exclude, # expected_include_result, expected_exclude_result, # ) # 0 ('a', {}, True, True,), ('a.b', {}, True, True,), ('a', {'a': {}}, True, False,), ('a.b', {'a': {}}, True, False,), ('a.b', {'a': {'c': {}}}, False, True,), # 5 ('a', {'a': {'b': {}}}, True, True,), ('a.b', {'a': {'b': {}}}, True, False,), ('a', {'a': {'*': {}}}, True, False,), ('a.b', {'a': {'*': {}}}, True, False,), ('a.b.c', {'a': {'*': {}}}, True, False,), # 10 ('a.b.c', {'a': {'*': {'d': {}}}}, False, True,), ('a.b.c', {'b': {}}, False, True,), ('a.b.c', {'a': {'b': {'d': {}}}}, False, True,), ('a.b.d', {'a': {'b': {'d': {}}}}, True, False,), ] for index, (path, structure, result_as_include, result_as_exclude,) in enumerate(tests, start=1): wrong_include_value = 'False' if result_as_include else 'True' error_msg = 'Incorrectly returned {} on include test for path `{}` on test {}'.format( wrong_include_value, path, index, ) assert mask.should_include_variable(path, structure) == result_as_include, error_msg wrong_exclude_value = 'False' if result_as_exclude else 'True' error_msg = 'Incorrectly returned {} on exclude test for path `{}` on test {}'.format( wrong_exclude_value, path, index, ) assert mask.should_include_variable(path, structure, is_negated=True) == result_as_exclude, error_msg PK!ojsonmask/tests/test_parsing.py"""Sample unit test module using pytest-describe and expecter.""" # pylint: disable=redefined-outer-name,unused-variable,expression-not-assigned,singleton-comparison import logging from expecter import expect from jsonmask import parsing def test_multiple_builds(): tests = [ { 'fields': 'a,b', 'mask': {'a': {}, 'b': {}}, }, { 'fields': 'a/b', 'mask': {'a': {'b': {}}}, }, { 'fields': 'a/b,c', 'mask': {'a': {'b': {}}, 'c': {}}, }, { 'fields': 'a/b,a/c', 'mask': {'a': {'b': {}, 'c': {}}}, }, { 'fields': 'a/b,a/c,c', 'mask': {'a': {'b': {}, 'c': {}}, 'c': {}}, }, { 'fields': 'a/b/z,a/c,c', 'mask': {'a': {'b': {'z': {}}, 'c': {}}, 'c': {}}, }, { 'fields': 'a(b)', 'mask': {'a': {'b': {}}}, }, { 'fields': 'a(b,c)', 'mask': {'a': {'b': {}, 'c': {}}}, }, { 'fields': 'a(b,c/d)', 'mask': {'a': {'b': {}, 'c': {'d': {}}}}, }, { 'fields': 'z,a(b,c/d)', 'mask': {'a': {'b': {}, 'c': {'d': {}}}, 'z': {}}, }, { 'fields': 'a(b,c/d),z', 'mask': {'a': {'b': {}, 'c': {'d': {}}}, 'z': {}}, }, { 'fields': 'a(b,c/d(x,y,z))', 'mask': {'a': {'b': {}, 'c': {'d': {'x': {}, 'y': {}, 'z': {}}}}}, }, { 'fields': 'a(b,c/d(x,y,z),abc(xyz/z(a)))', 'mask': {'a': {'b': {}, 'c': {'d': {'x': {}, 'y': {}, 'z': {}}, 'abc': {'xyz': {'z': {'a': {}}}}}}}, }, { 'fields': 'a(b,c/d(x,y,z)),abc(xyz/z(a))', 'mask': {'a': {'b': {}, 'c': {'d': {'x': {}, 'y': {}, 'z': {}}}}, 'abc': {'xyz': {'z': {'a': {}}}}}, }, ] for test in tests: error_msg = 'Failed to compile fields {}'.format(test['fields']) try: expect(test['mask']) == parsing.parse_fields(test['fields']) except Exception as e: logging.exception(e) raise e PK!HMWXjsonmask-0.1.0.dist-info/WHEEL A н#Z@Z|Jl~6蓅 Λ MU4[PYBpYD*Mͯ#ڪ/̚?ݭ'%nPK!H=Bs  !jsonmask-0.1.0.dist-info/METADATAV[O#7~8+*h2å-ҨPQaI U&^fl*{@Aݼ5ѱ;W9zrGuR]'Uy;n.ˢJ`\ Tށ#u#L!:`RV]̽7.Ly9.n$ڸ&WWM0ìnZΎKh8\fpg\ C *%58:܋p?ډb#:b#omvs-ŕ).0&d.=$]$JI=o2c86L OTޑQKY8*+)A'͟]ܓCǢcAS ۪~h֕_W>nmvNXizI5C_g*#kHnJ}&ۀ JB=c+46E͖#L+Pŋݘn+|ԭ-&$7K+o;5jJ?7a߀vנ8@Zxa)Կ8pOH0vssCKp"O1m!%Z]ԏӥi" YE*"]#M}i"Su P cǷ<qX2\܅a\,5ttr]l:]4M864}U*EƭD6{nm>ma5b헼47?|+4@4Ri%{%9 ki3(2`)oz3y +]fsKqs8xC88[4O9:׊Y '^?PK!H#'jsonmask-0.1.0.dist-info/RECORDur@} 7fEh!"6r\ZP~ƅd6Wu_qz.i͚0qME)0yt2En7cV={<9t{זW}tR:^޽ic1fȈ,~CMe?Yu1%HT45/`=x@-ziۙk8.T?S-_;mjlۑEY+:OO QpܧIVHyp6I|u}sT.,ce f(= Z4vJq讘%j!%%hdDnlS A"p$LPO& V&0Dz܌q3\-댼jld?PK!jsonmask/__init__.pyPK!$s jsonmask/mask.pyPK!(RVVjsonmask/parsing.pyPK!ޙPb""=jsonmask/tests/__init__.pyPK!ov,WWjsonmask/tests/conftest.pyPK!͞C&jsonmask/tests/test_mask.pyPK!o'jsonmask/tests/test_parsing.pyPK!HMWX0jsonmask-0.1.0.dist-info/WHEELPK!H=Bs  !0jsonmask-0.1.0.dist-info/METADATAPK!H#'4jsonmask-0.1.0.dist-info/RECORDPK 6