PKÂN&Q$,$,perky/__init__.py#!/usr/bin/env python3 # # Part of the "perky" Python library # Copyright 2018 by Larry Hastings # TODO: # # turn if 0 module-level tests into real tests, dude # # explicit fns for xform schema vs function # * need betterer names # # Per-line callback function (for #include) # * and, naturally, an example callback function # for you to use (aka "#include") # # More library utility functions to manage # perky dict/lists: # * recursive "chain map" # * recursive merge # # Ensure you can use multiple Required objects # with the same function (e.g. "int") # # transform exceptions should print a breadcrumb # trail so we know where the erroneous value lives # TESTS NEEDED: # # this should fail: # a = ''' # outdenting is fun # ''' # # make sure a quoted # works as a key # # ensure that unquoted string names can contain # [ { ''' """ # # and unquoted string values can contain all those AND # = """ A simple, Pythonic file format. Same interface as the "pickle" module (load, loads, dump, dumps). """ __version__ = "0.1.2" import ast import re import shlex import sys import textwrap from .tokenize import * from .utility import * class PerkyFormatError(Exception): pass def assert_or_raise(*exprs): exprs = list(exprs) s = exprs.pop() if not all(exprs): raise PerkyFormatError(s) class Parser: def __init__(self, s): self.lp = LineParser(s) def assert_or_raise(self, *exprs): exprs = list(exprs) s = exprs.pop() if not all(exprs): raise PerkyFormatError(f"Line {self.lp.line_number}: {s}") def _parse_value(self, t): tok, value = t if tok is LEFT_CURLY_BRACE: return self._read_dict() if tok is LEFT_SQUARE_BRACKET: return self._read_list() if (tok is TRIPLE_SINGLE_QUOTE) or (tok is TRIPLE_DOUBLE_QUOTE): return self._read_textblock(value) return value def _read_dict(self): d = {} for tokens in self.lp: if len(tokens) == 1 and tokens[0][0] is RIGHT_CURLY_BRACE: break self.assert_or_raise( len(tokens) == 3 and tokens[0][0] is STRING and tokens[1][0] is EQUALS, "Invalid token sequence: in dict, expected STRING = VALUE or }, line = " + repr(self.lp.line)) name = tokens[0][1].strip() value = self._parse_value(tokens[2]) d[name] = value return d def _read_list(self): l = [] for tokens in self.lp: self.assert_or_raise( len(tokens) == 1, "Invalid token sequence: in list, expected one token, line = " + repr(self.lp.line)) token = tokens[0] if token[0] is RIGHT_SQUARE_BRACKET: break value = self._parse_value(token) l.append(value) return l def _read_textblock(self, marker): l = [] while self.lp: line = self.lp.next_line().rstrip() stripped = line.lstrip() if stripped == marker: break l.append(line) prefix = line.partition(stripped)[0] if prefix: # detect this error: # a = ''' # outdenting is fun # ''' for line in l: self.assert_or_raise( # line must either be empty or start with our prefix (not line) or line.startswith(prefix), "Format error: malformed line triple-quoted block, line is " + repr(line)) s = "\n".join(line for line in l) # this one line does all the # heavy lifting in textwrap.dedent() s = re.sub(r'(?m)^' + prefix, '', s) return s class Serializer: def __init__(self, prefix=" "): self.prefix = prefix self.reset() def reset(self): self.indent = 0 self.lines = [] self.line = '' def dumps(self): s = "\n".join(self.lines) self.reset() return s def newline(self, s): line = self.line self.line = '' if s: line = line + s if self.indent: line = (self.indent * self.prefix) + line self.lines.append(line) @staticmethod def quoted_string(s): return shlex.quote(s) def serialize(self, d): for name, value in d.items(): if not isinstance(name, str): raise RuntimeError("keys in perky dicts must always be strings!") if name == name.strip() and "".join(name.split()).isalnum(): self.line = self.quoted_string(name) else: self.line = name self.line += " = " self.serialize_value(value) def serialize_dict(self, value): self.newline("{") self.indent += 1 self.serialize(value) self.newline("}") self.indent -= 1 def serialize_list(self, l): self.newline("[") self.indent += 1 for value in l: self.serialize_value(value) self.newline("]") self.indent -= 1 def serialize_quoted_string(self, s): self.newline(self.quoted_string(s)) def serialize_textblock(self, s): self.newline('"""') self.indent += 1 for line in s.split("\n"): self.newline(line) self.newline('"""') self.indent -= 1 def serialize_value(self, value): if isinstance(value, dict): return self.serialize_dict(value) if isinstance(value, list): return self.serialize_list(value) value = str(value) if '\n' in value: return self.serialize_textblock(value) if value == value.strip() and "".join(value.split()).isalnum(): self.newline(value) return return self.serialize_quoted_string(value) def loads(s): p = Parser(s) d = p._read_dict() return d def dumps(d): s = Serializer() s.serialize(d) return s.dumps() def load(filename, encoding="utf-8"): with open(filename, "rt", encoding=encoding) as f: return loads(f.read()) def dump(filename, d, encoding="utf-8"): with open(filename, "wt", encoding=encoding) as f: f.write(serialize(d)) if 0: text = """ a = b c = d dict = { inner1=value1 inner 2 = " value 2 " list = [ a b c ] } list = [ 1 2 3 ] text = ''' hello this is indented etc. ''' """ d = loads(text) print(d) print(serialize(d)) def map(o, fn): if isinstance(o, dict): return {name: map(value, fn) for name, value in o.items()} if isinstance(o, list): return [map(value, fn) for value in o] return fn(o) def _transform(o, schema, default): if isinstance(schema, dict): assert_or_raise( isinstance(o, dict), f"schema mismatch: schema is a dict, o should be a dict but is {o!r}") result = {} for name, value in o.items(): handler = schema.get(name) if handler: value = _transform(value, handler, default) elif default: value = default(value) result[name] = value return result if isinstance(schema, list): assert_or_raise( isinstance(o, list), len(schema) == 1, f"schema mismatch: schema is a list, o should be a list but is {o!r}") handler = schema[0] return [_transform(value, handler, default) for value in o] assert_or_raise( callable(schema), f"schema mismatch: schema values must be dict, list, or callable, got {schema!r}") return schema(o) def transform(o, schema, default=None): assert_or_raise( isinstance(o, dict), "schema must be a dict") assert_or_raise( (not default) or callable(default), "default must be either None or a callable") return _transform(o, schema, default) constmap = { 'None': None, 'True': True, 'False': False, } def const(s): return constmap[s] def nullable(type): def fn(o): if o == 'None': return None return type(o) return fn class _AnnotateSchema: def __init__(self): self.reset() def reset(self): self.head = [] self.tail = [] def crawl(self, value, name=''): if isinstance(value, dict): self.head.append(name + "{") self.tail.append('}') d = value for name, value in d.items(): self.crawl(value, name) self.head.pop() self.tail.pop() return if isinstance(value, list): self.head.append(name + "[") self.tail.append(']') self.crawl(value[0]) self.head.pop() self.tail.pop() return assert_or_raise( callable(value), "Malformed schema error: " + repr(name) + " = " + repr(value) + ", value is not dict, list, or callable!") required = getattr(value, "_perky_required", None) if required: s = "".join(self.head) + name + "".join(reversed(self.tail)) required[0] = s required[1] = False class UnspecifiedRequiredValues(Exception): def __init__(self, breadcrumbs): self.breadcrumbs = breadcrumbs def __repr__(self): breadcrumbs = " ".join(shlex.quote(s) for s in self.breadcrumbs) return f"" def __str__(self): return repr(self) class Required: def __init__(self): self.markers = [] def annotate(self, schema): annotator = _AnnotateSchema() annotator.crawl(schema) def __call__(self, fn): marker = ['', False] self.markers.append(marker) def wrapper(o): marker[1] = True return fn(o) wrapper._perky_required = marker return wrapper def verify(self): failed = [] for breadcrumb, value in self.markers: if not value: failed.append(breadcrumb) if failed: failed.sort() raise UnspecifiedRequiredValues(failed) if 0: o = {'a': '3', 'b': '5.0', 'c': ['1', '2', 'None', '3'], 'd': { 'e': 'f', 'g': 'True'}} schema = {'a': int, 'b': float, 'c': [nullable(int)], 'd': { 'e': str, 'g': const }} result = transform(o, schema) import pprint pprint.pprint(result) print("REQUIRED 1") r = Required() schema = { 'a': r(int), 'b': r(float), 'c': [nullable(int)], 'd': { 'e': r(str), 'g': const } } r.annotate(schema) print("schema", schema) result = transform(o, schema) print(result) r.verify() print("REQUIRED 2") r.annotate(schema) o2 = {'a': '44'} result = transform(o2, schema) r.verify() PKNCl2((perky/tokenize.py# # tokenize.py # # Part of the "perky" Python library # Copyright 2018 by Larry Hastings # import ast import sys WHITESPACE = '' STRING = '' EQUALS = '' LEFT_CURLY_BRACE = '' RIGHT_CURLY_BRACE = '' LEFT_SQUARE_BRACKET = '' RIGHT_SQUARE_BRACKET = '' COMMENT = '' TRIPLE_SINGLE_QUOTE = "" TRIPLE_DOUBLE_QUOTE = '' c_to_token = { '=': EQUALS, '{': LEFT_CURLY_BRACE, '}': RIGHT_CURLY_BRACE, '[': LEFT_SQUARE_BRACKET, ']': RIGHT_SQUARE_BRACKET, '#': COMMENT, } c_to_triple_quote = { "'": TRIPLE_SINGLE_QUOTE, '"': TRIPLE_DOUBLE_QUOTE, } all_operators = set(c_to_token) all_operators.union(c_to_triple_quote) all_operators.add('=') all_operators.add('#') token_to_name = { WHITESPACE: "WHITESPACE", STRING: "STRING", EQUALS: "EQUALS", LEFT_CURLY_BRACE: "LEFT_CURLY_BRACE", RIGHT_CURLY_BRACE: "RIGHT_CURLY_BRACE", LEFT_SQUARE_BRACKET: "LEFT_SQUARE_BRACKET", RIGHT_SQUARE_BRACKET: "RIGHT_SQUARE_BRACKET", COMMENT: "COMMENT", TRIPLE_SINGLE_QUOTE: "TRIPLE_SINGLE_QUOTE", TRIPLE_DOUBLE_QUOTE: "TRIPLE_DOUBLE_QUOTE", } class pushback_str_iterator: def __init__(self, s): self.characters = list(reversed(s)) def __repr__(self): contents = "".join(reversed(self.characters)) return f'' def push(self, s): for c in s: self.characters.append(c) def __next__(self): if not self.characters: raise StopIteration() return self.characters.pop() def __iter__(self): return self def __bool__(self): return bool(self.characters) def drain(self): """ Return all remaining characters as a string. """ s = "".join(reversed(self.characters)) self.characters.clear() return s def tokenize(s, skip_whitespace=True): """ Iterator that yields tokens from a line. Handles two types of lines: * lines in a dict name = value * lines in a list value Each token is a tuple of length two: the first element is one of the predefined tokens at the top of this file, and the second is the "value" of that token (e.g if the token is STRING, value is the value of that string). """ i = pushback_str_iterator(s) def parse_unquoted_string(): """ Parse an unquoted string. In Perky, this is a string without quote marks, but *with* spaces. Returns the unquoted string parsed. If there were no characters to be read, returns an empty string. Stops the unquoted string at EOL, or the first character used in Perky syntax (=, {, [, etc). (If you need to use one of those inside your string, use a quoted string.) """ buffer = [] for c in i: if c in all_operators: i.push(c) break buffer.append(c) return "".join(buffer).rstrip() def parse_quoted_string(quote): """ Parse a quoted string. The ending quote must match the starting quote character passed in. Handles all the Python escape sequences: all the single-character ones, octal, and the extra-special x u U N ones. """ buffer = [quote] backslash = False for c in i: if backslash: backslash = False elif c == '\\': backslash = True continue buffer.append(c) if c == quote: break try: return ast.literal_eval("".join(buffer)) except SyntaxError as e: raise e def flush(): t = "".join(token) token.clear() return t whitespace = None for c in i: token = [c] if c.isspace(): # whitespace for c in i: if not c.isspace(): i.push(c) break token.append(c) if skip_whitespace: token.clear() else: yield WHITESPACE, flush() continue tok = c_to_token.get(c, None) if tok: if tok == COMMENT: yield COMMENT, i.drain() return yield tok, flush() continue tok = c_to_triple_quote.get(c, None) if tok: # it's a quote character, but is it a triple-quote? is_triple_quote = False if i: c2 = next(i) if i: c3 = next(i) is_triple_quote = c == c2 == c3 if not is_triple_quote: i.push(c3) i.push(c2) else: i.push(c2) if is_triple_quote: # triple quote must be last thing on line (except maybe ignored trailing whitespace) trailing = i.drain() if trailing and not trailing.isspace(): raise RuntimeError("tokenizer: found triple-quote followed by non-whitespace string " + repr(trailing)) yield tok, c*3 if trailing: yield WHITESPACE, trailing return yield STRING, parse_quoted_string(c) continue i.push(c) s = parse_unquoted_string() yield STRING, s class LineParser: def __init__(self, s, skip_whitespace=True): self._lines = s.split("\n") self.lines = enumerate(self._lines) self.skip_whitespace = skip_whitespace def __repr__(self): return f"" def __iter__(self): return self def __bool__(self): return bool(self.lines) def next_line(self): line_number, line = next(self.lines) self.line_number = line_number return line def tokens(self): while self.lines: line = self.line = self.next_line() l = list(tokenize(line, skip_whitespace=self.skip_whitespace)) if l: return l if l is None: return None def __next__(self): while True: t = self.tokens() if t: return t if t is None: raise StopIteration() if __name__ == "__main__": want_print = False # want_print = True def test(s, *tokens_and_values): tokens = [] values = [] tokens_with_values = set((STRING, COMMENT)) expect_token = True for t in tokens_and_values: is_token = token_to_name.get(t) if expect_token: assert is_token, "expected token, got " + str(t) tokens.append(t) if t in tokens_with_values: expect_token = False else: values.append(t) expect_token = True if want_print: print("test input:\n\t", s, "\nshould match:\n\t", " ".join(x if x in token_to_name else repr(x) for x in tokens_and_values)) for tok, s in tokenize(s, skip_whitespace=False): if want_print: print(" >>", tok, repr(s)) t = tokens.pop(0) if tok != t: sys.exit("token doesn't match, expected " + str(token_to_name[t]) + " got " + str(token_to_name.get(tok))) if tok in tokens_with_values: v = values.pop(0) if v != s: sys.exit("token value doesn't match, expected " + repr(v) + " got " + repr(s)) if want_print: print() test(r"hey party people ", STRING, "hey party people") test(r" hey party people ", WHITESPACE, STRING, "hey party people") test(r"# hey party people ", COMMENT, " hey party people ") test(r" # hey party people ", WHITESPACE, COMMENT, " hey party people ") test(r""" "quoted \\u1234 string" """, WHITESPACE, STRING, "quoted \u1234 string", WHITESPACE) test(r""" "quoted \\N{END OF LINE} string" """, WHITESPACE, STRING, "quoted \n string", WHITESPACE) test(r""" "quoted string" = value """, WHITESPACE, STRING, "quoted string", WHITESPACE, EQUALS, WHITESPACE, STRING, "value", WHITESPACE) test(r""" "quoted string"=value """, WHITESPACE, STRING, "quoted string", EQUALS, STRING, "value", WHITESPACE) test(r""" "quoted string"={""", WHITESPACE, STRING, "quoted string", EQUALS, LEFT_CURLY_BRACE) test(r""" "quoted string" = {""", WHITESPACE, STRING, "quoted string", WHITESPACE, EQUALS, WHITESPACE, LEFT_CURLY_BRACE) test(r""" "quoted string"=[""", WHITESPACE, STRING, "quoted string", EQUALS, LEFT_SQUARE_BRACKET) test(r""" "quoted string" = [""", WHITESPACE, STRING, "quoted string", WHITESPACE, EQUALS, WHITESPACE, LEFT_SQUARE_BRACKET) test(r"x=y", STRING, "x", EQUALS, STRING, "y") test(r"x={", STRING, "x", EQUALS, LEFT_CURLY_BRACE) test(r"x=[", STRING, "x", EQUALS, LEFT_SQUARE_BRACKET) test(r'''x="quoted string"''', STRING, "x", EQUALS, STRING, "quoted string") # and now, the big finish test(r""" 'quoted string' "quoted string 2" [ { = "quoted value" [ { ] } = "yes!" [{}] ''' """, WHITESPACE, STRING, "quoted string", WHITESPACE, STRING, "quoted string 2", WHITESPACE, LEFT_SQUARE_BRACKET, WHITESPACE, LEFT_CURLY_BRACE, WHITESPACE, EQUALS, WHITESPACE, STRING, "quoted value", WHITESPACE, LEFT_SQUARE_BRACKET, WHITESPACE, LEFT_CURLY_BRACE, WHITESPACE, RIGHT_SQUARE_BRACKET, WHITESPACE, RIGHT_CURLY_BRACE, WHITESPACE, EQUALS, WHITESPACE, STRING, "yes!", WHITESPACE, LEFT_SQUARE_BRACKET, LEFT_CURLY_BRACE, RIGHT_CURLY_BRACE, RIGHT_SQUARE_BRACKET, WHITESPACE, TRIPLE_SINGLE_QUOTE, WHITESPACE, ) PKmMU۩$ $ perky/utility.py#!/usr/bin/env python3 # # Part of the "perky" Python library # Copyright 2018 by Larry Hastings class RecursiveChainMap(dict): def __init__(self, *dicts): self.cache = {} self.maps = [self.cache] self.maps.extend(dicts) self.deletes = set() def __missing__(self, key): raise KeyError(key) def __getitem__(self, key): if key in self.deletes: raise self.__missing__(key) submaps = [] for map in self.maps: try: # "key in dict" doesn't work with defaultdict! value = map[key] if isinstance(value, dict): submaps.append(value) elif not submaps: return value except KeyError: continue if not submaps: raise self.__missing__(key) value = RecursiveChainMap(*submaps) self.cache[key] = value return value def __setitem__(self, key, value): self.cache[key] = value self.deletes.discard(key) def __delitem__(self, key, value): if key in self.deletes: self.__missing__(key) self.deletes.add(key) __sentinel = object() def get(self, key, default=__sentinel): if key in self: return key[self] if default is not __sentinel: return default raise self.__missing__(key) def __len__(self): return len(set().union(*self.maps) - self.deletes) def __iter__(self): return iter(set().union(*self.maps) - self.deletes) def __contains__(self, key): if key in self.deletes: return False return any(key in map for map in self.maps) def __bool__(self): if not self.deletes: return any(self.maps) for map in self.maps: keys = set(map) - self.deletes if keys: return True def _merge_dicts(rcm): d = {} for key in rcm: value = rcm[key] if isinstance(value, RecursiveChainMap): value = _merge_dicts(value) d[key] = value return d def merge_dicts(*dicts): rcm = RecursiveChainMap(*dicts) return _merge_dicts(rcm) if __name__ == "__main__": dict1 = {'a': 1, 'sub': {1: 2, 3:4, 5:6}} dict2 = {'b': 2, 'sub': {2: 3, 4:5, 6:7}} rcm = RecursiveChainMap(dict1, dict2) print(rcm['a']) print(rcm['b']) sub = rcm['sub'] print([(name, sub[name]) for name in range(1, 7)]) d = merge_dicts(dict1, dict2) print(d) PKmMfperky-0.1.2.dist-info/LICENSEperky Copyright 2018 by Larry Hastings All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. * Neither the name of the nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.PK!HPOperky-0.1.2.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!Hl:9perky-0.1.2.dist-info/METADATAW]o6}篸<1lg>Pkt,lɠE1%F"U#\Lk`&yy?= A.~QkkVzJFUkq5t릭՜>ag.ҵʺF%=6AJS{ۨE+п n,lsUKA\uǁoV\w0V#KY9Oo:SBȕ/^*?eIIߟT9-Gl;z 7On^$iO:3h 61oSt~) Ե=p)J+%CRy&xaM\hSmy`,XaB`S2noi/Nm3./K농/ǭgjYoǓf|*Ƨ 3 ٻ05֩RdM-C>8v`/z:+J&I.nxYO>4o؟q%zWydY C ЅPVQBcb',@$dM.G,$$0j\DDu'O e l,$^$&ĝQZsӞ*GE#5P b|nL=b ϭt[,CzlX@W`[W|Z2jCe .+QȭѾYѧ<,? q{ [/@ڴ]H^^p\EbS?-Hz$}t Q ҊУVdM~'NIr +"pדiG_ϞYu?[zA"cś!lZy_/Vo.>ENIŨ8C=SG>l?ܾDIK?5ʑc |t܀`U3vy~GpsrtZ|=ldh5Qrܽ^ cǸ3GSMp5Kl;@ 1@P 4dG"da3N|: $33`t@,۳Olp21oua(qgN cp˟w`Z f0L=XXj06<F-Js'FH(p= r2z:r"<W^$R)qhNfjdXU[kԘޟTAM87qj8G]"Su u!5ϟ) ,>C+x" fU-lbܳr6O:^StOn0;//W 9hX4 )5C؍;W{.s?&}ԀXy.#0Ĩmdfq"\!9;p&^}