PKȻL2''perky/__init__.py#!/usr/bin/env python3 # # Part of the "perky" Python library # Copyright 2018 by Larry Hastings # TODO: # # remove asserts # # remove tokens_match # # Per-line callback function (for #include) # * and, naturally, an example callback function # for you to use (aka "#include") # # More library utility functions to manage # perky dict/lists: # * A unary "transform" function--instead of a # whole schema, just a single function # * recursive "chain map" # * recursive merge # # Ensure you can use multiple Required objects # with the same function (e.g. "int") # TESTS NEEDED: # # this should fail: # a = ''' # outdenting is fun # ''' # # make sure a quoted # works as a key # # ensure that unquoted string names can contain # [ { ''' """ # # and unquoted string values can contain all those AND # = """ A simple, Pythonic file format. Same interface as the "pickle" module (load, loads, dump, dumps). """ __version__ = "0.1" import ast import re import shlex import sys import textwrap from .tokenize import * from .utility import * class PerkyFormatError(Exception): pass def _parse_value(t, lp): tok, value = t if tok == LEFT_CURLY_BRACE: return _read_dict(lp) if tok == LEFT_SQUARE_BRACKET: return _read_list(lp) if tok in (TRIPLE_SINGLE_QUOTE, TRIPLE_DOUBLE_QUOTE): return _read_textblock(lp, value) return value def _read_dict(lp): d = {} # print("read_dict start, lp", lp) for tokens in lp: # print("read_dict TOKENS", tokens) if tokens_match(tokens, RIGHT_CURLY_BRACE): break assert len(tokens) == 3 assert tokens[0][0] == STRING assert tokens[1][0] == EQUALS name = tokens[0][1].strip() value = _parse_value(tokens[2], lp) d[name] = value # print(f"NAME {name!r} = VALUE {value!r}") return d def _read_list(lp): l = [] for tokens in lp: # print("read_list TOKENS", tokens) if tokens_match(tokens, RIGHT_SQUARE_BRACKET): break assert len(tokens) == 1 token = tokens[0] value = _parse_value(token, lp) l.append(value) # print(f"VALUE {value!r}") return l def _read_textblock(lp, marker): l = [] # print("read_textblock start, marker", marker) while lp: line = lp.line().rstrip() stripped = line.lstrip() if stripped == marker: break l.append(line) prefix = line.partition(stripped)[0] if prefix: # detect this error: # a = ''' # outdenting is fun # ''' for line in l: if line.strip() and not line.startswith(prefix): # print("RAISING PerkyFormatError") raise PerkyFormatError("Text in triple-quoted block before left margin") s = "\n".join(line for line in l) # this one line does all the # heavy lifting in textwrap.dedent() s = re.sub(r'(?m)^' + prefix, '', s) # print("read_textblock returning", repr(s)) return s class Serializer: def __init__(self, prefix=" "): self.prefix = prefix self.reset() def reset(self): self.indent = 0 self.lines = [] self.line = '' def dumps(self): s = "\n".join(self.lines) self.reset() return s def newline(self, s): line = self.line self.line = '' if s: line = line + s if self.indent: line = (self.indent * self.prefix) + line self.lines.append(line) @staticmethod def quoted_string(s): return shlex.quote(s) def serialize(self, d): for name, value in d.items(): if not isinstance(name, str): raise RuntimeError("keys in perky dicts must always be strings!") if name == name.strip() and "".join(name.split()).isalnum(): self.line = self.quoted_string(name) else: self.line = name self.line += " = " self.serialize_value(value) def serialize_dict(self, value): self.newline("{") self.indent += 1 self.serialize(value) self.newline("}") self.indent -= 1 def serialize_list(self, l): self.newline("[") self.indent += 1 for value in l: self.serialize_value(value) self.newline("]") self.indent -= 1 def serialize_quoted_string(self, s): self.newline(self.quoted_string(s)) def serialize_textblock(self, s): self.newline('"""') self.indent += 1 for line in s.split("\n"): self.newline(line) self.newline('"""') self.indent -= 1 def serialize_value(self, value): if isinstance(value, dict): return self.serialize_dict(value) if isinstance(value, list): return self.serialize_list(value) value = str(value) if '\n' in value: return self.serialize_textblock(value) if value == value.strip() and "".join(value.split()).isalnum(): self.newline(value) return return self.serialize_quoted_string(value) def loads(s): lp = LineParser(s) d = _read_dict(lp) return d def dumps(d): s = Serializer() s.serialize(d) return s.dumps() def load(filename, encoding="utf-8"): with open(filename, "rt", encoding=encoding) as f: return loads(f.read()) def dump(filename, d, encoding="utf-8"): with open(filename, "wt", encoding=encoding) as f: f.write(serialize(d)) if 0: text = """ a = b c = d dict = { inner1=value1 inner 2 = " value 2 " list = [ a b c ] } list = [ 1 2 3 ] text = ''' hello this is indented etc. ''' """ d = loads(text) print(d) print(serialize(d)) constmap = { 'None': None, 'True': True, 'False': False, } def const(s): return constmap[s] def nullable(type): def fn(o): if o == 'None': return None return type(o) return fn def _transform_function(o, fn): if isinstance(o, dict): return {name: _transform_function(value, fn) for name, value in o.items()} if isinstance(o, list): return [_transform_function(value, fn) for value in o] return fn(o) def _transform_schema(o, schema): if isinstance(schema, dict): if not isinstance(o, dict): sys.exit("Schema mismatch, expected schema and o to both be dicts") newdict = {} for name, value in o.items(): handler = schema.get(name) if handler: value = _transform_schema(value, handler) newdict[name] = value return newdict if isinstance(schema, list): if not isinstance(o, list): sys.exit("Schema mismatch, expected schema and o to both be lists") assert len(schema) == 1 handler = schema[0] return [_transform_schema(value, handler) for value in o] return schema(o) def transform(o, transformation=ast.literal_eval): if callable(transformation): return _transform_function(o, transformation) return _transform_schema(o, transformation) class _AnnotateSchema: def __init__(self): self.reset() def reset(self): self.head = [] self.tail = [] def crawl(self, value, name=''): if isinstance(value, dict): self.head.append(name + "{") self.tail.append('}') d = value for name, value in d.items(): self.crawl(value, name) self.head.pop() self.tail.pop() return if isinstance(value, list): self.head.append(name + "[") self.tail.append(']') self.crawl(value[0]) self.head.pop() self.tail.pop() return assert callable(value), "value " + repr(value) + " is not callable!" required = getattr(value, "_perky_required", None) if required: s = "".join(self.head) + name + "".join(reversed(self.tail)) required[0] = s required[1] = False class UnspecifiedRequiredValues(Exception): def __init__(self, breadcrumbs): self.breadcrumbs = breadcrumbs def __repr__(self): breadcrumbs = " ".join(shlex.quote(s) for s in self.breadcrumbs) return f"" def __str__(self): return repr(self) class Required: def __init__(self): self.markers = [] def annotate(self, schema): annotator = _AnnotateSchema() annotator.crawl(schema) def __call__(self, fn): marker = ['', False] self.markers.append(marker) def wrapper(o): marker[1] = True return fn(o) wrapper._perky_required = marker return wrapper def verify(self): failed = [] for breadcrumb, value in self.markers: if not value: failed.append(breadcrumb) if failed: failed.sort() raise UnspecifiedRequiredValues(failed) if 0: o = {'a': '3', 'b': '5.0', 'c': ['1', '2', 'None', '3'], 'd': { 'e': 'f', 'g': 'True'}} schema = {'a': int, 'b': float, 'c': [nullable(int)], 'd': { 'e': str, 'g': const }} result = transform(o, schema) import pprint pprint.pprint(result) print("REQUIRED 1") r = Required() schema = { 'a': r(int), 'b': r(float), 'c': [nullable(int)], 'd': { 'e': r(str), 'g': const } } r.annotate(schema) print("schema", schema) result = transform(o, schema) print(result) r.verify() print("REQUIRED 2") r.annotate(schema) o2 = {'a': '44'} result = transform(o2, schema) r.verify() PKL[(P,,perky/tokenize.py# # tokenize.py # # Part of the "perky" Python library # Copyright 2018 by Larry Hastings # import ast import sys WHITESPACE = '' STRING = '' EQUALS = '' LEFT_CURLY_BRACE = '' RIGHT_CURLY_BRACE = '' LEFT_SQUARE_BRACKET = '' RIGHT_SQUARE_BRACKET = '' COMMENT = '' TRIPLE_SINGLE_QUOTE = "" TRIPLE_DOUBLE_QUOTE = '' c_to_token = { '=': EQUALS, '{': LEFT_CURLY_BRACE, '}': RIGHT_CURLY_BRACE, '[': LEFT_SQUARE_BRACKET, ']': RIGHT_SQUARE_BRACKET, '#': COMMENT, } c_to_triple_quote = { "'": TRIPLE_SINGLE_QUOTE, '"': TRIPLE_DOUBLE_QUOTE, } all_operators = set(c_to_token) all_operators.union(c_to_triple_quote) all_operators.add('=') all_operators.add('#') token_to_name = { WHITESPACE: "WHITESPACE", STRING: "STRING", EQUALS: "EQUALS", LEFT_CURLY_BRACE: "LEFT_CURLY_BRACE", RIGHT_CURLY_BRACE: "RIGHT_CURLY_BRACE", LEFT_SQUARE_BRACKET: "LEFT_SQUARE_BRACKET", RIGHT_SQUARE_BRACKET: "RIGHT_SQUARE_BRACKET", COMMENT: "COMMENT", TRIPLE_SINGLE_QUOTE: "TRIPLE_SINGLE_QUOTE", TRIPLE_DOUBLE_QUOTE: "TRIPLE_DOUBLE_QUOTE", } class pushback_str_iterator: def __init__(self, s): self.characters = list(reversed(s)) def __repr__(self): contents = "".join(reversed(self.characters)) return f'' def push(self, s): # print("PUSH ->", repr(s)) for c in s: self.characters.append(c) def __next__(self): # print("I -> ", self.characters) if not self.characters: raise StopIteration() return self.characters.pop() def __iter__(self): return self def __bool__(self): return bool(self.characters) def drain(self): """ Return all remaining characters as a string. """ s = "".join(reversed(self.characters)) self.characters.clear() return s def tokenize(s, skip_whitespace=True): """ Iterator that yields tokens from a line. Handles two types of lines: * lines in a dict name = value * lines in a list value Each token is a tuple of length two: the first element is one of the predefined tokens at the top of this file, and the second is the "value" of that token (e.g if the token is STRING, value is the value of that string). """ i = pushback_str_iterator(s) def parse_unquoted_string(): """ Parse an unquoted string. In Perky, this is a string without quote marks, but *with* spaces. Returns the unquoted string parsed. If there were no characters to be read, returns an empty string. Stops the unquoted string at EOL, or the first character used in Perky syntax (=, {, [, etc). (If you need to use one of those inside your string, use a quoted string.) """ buffer = [] for c in i: if c in all_operators: i.push(c) break buffer.append(c) return "".join(buffer).rstrip() def parse_quoted_string(quote): """ Parse a quoted string. The ending quote must match the starting quote character passed in. Handles all the Python escape sequences: all the single-character ones, octal, and the extra-special x u U N ones. """ buffer = [quote] backslash = False for c in i: if backslash: backslash = False elif c == '\\': backslash = True continue buffer.append(c) if c == quote: break try: return ast.literal_eval("".join(buffer)) except SyntaxError as e: # print("FAILED AT BUFFER", buffer) raise e def flush(): t = "".join(token) token.clear() return t whitespace = None for c in i: token = [c] if c.isspace(): # whitespace for c in i: if not c.isspace(): i.push(c) break token.append(c) if skip_whitespace: token.clear() else: yield WHITESPACE, flush() continue tok = c_to_token.get(c, None) if tok: if tok == COMMENT: yield COMMENT, i.drain() return yield tok, flush() continue tok = c_to_triple_quote.get(c, None) if tok: # it's a quote character, but is it a triple-quote? is_triple_quote = False if i: c2 = next(i) if i: c3 = next(i) is_triple_quote = c == c2 == c3 if not is_triple_quote: i.push(c3) i.push(c2) else: i.push(c2) if is_triple_quote: # triple quote must be last thing on line (except maybe ignored trailing whitespace) trailing = i.drain() if trailing and not trailing.isspace(): raise RuntimeError("tokenizer: found triple-quote followed by non-whitespace string " + repr(trailing)) yield tok, c*3 if trailing: yield WHITESPACE, trailing return yield STRING, parse_quoted_string(c) continue i.push(c) s = parse_unquoted_string() yield STRING, s class LineParser: def __init__(self, s, skip_whitespace=True): self._lines = s.split("\n") self.lines = enumerate(self._lines) self.skip_whitespace = skip_whitespace def __repr__(self): return f"" def __iter__(self): return self def __bool__(self): return bool(self.lines) def line(self): line_number, line = next(self.lines) self.line_number = line_number return line def tokens(self): while self.lines: line = self.line() # print("TOKENS 221 LINE", self.line_number, repr(line)) sys.stdout.flush() l = list(tokenize(line, skip_whitespace=self.skip_whitespace)) if l: # print("LINE_NUMBER", self.line_number, "TOKENS", l) # sys.stdout.flush() return l if l is None: return None # continue def __next__(self): while True: t = self.tokens() if t: # print("LP returning tokens", t) return t if t is None: # print("LP raising StopIteration") raise StopIteration() # continue def tokens_match(tokens, *t): """ tokens_match(tok, STRING, EQUALS, STRING) tok is a list, all subsequent arguments are tokens. returns True if the tok contains that list of tokens. (ignores the values of the tokens.) """ if len(tokens) == len(t): for tok, t2 in zip(tokens, t): t1, value = tok if t1 != t2: break else: return True return False if __name__ == "__main__": want_print = False # want_print = True def test(s, *tokens_and_values): tokens = [] values = [] tokens_with_values = set((STRING, COMMENT)) expect_token = True for t in tokens_and_values: # print("t", t, "expect_token", expect_token) is_token = token_to_name.get(t) if expect_token: assert is_token, "expected token, got " + str(t) tokens.append(t) if t in tokens_with_values: expect_token = False else: values.append(t) expect_token = True if want_print: print("test input:\n\t", s, "\nshould match:\n\t", " ".join(x if x in token_to_name else repr(x) for x in tokens_and_values)) for tok, s in tokenize(s, skip_whitespace=False): if want_print: print(" >>", tok, repr(s)) t = tokens.pop(0) if tok != t: sys.exit("token doesn't match, expected " + str(token_to_name[t]) + " got " + str(token_to_name.get(tok))) if tok in tokens_with_values: v = values.pop(0) if v != s: sys.exit("token value doesn't match, expected " + repr(v) + " got " + repr(s)) if want_print: print() test(r"hey party people ", STRING, "hey party people") test(r" hey party people ", WHITESPACE, STRING, "hey party people") test(r"# hey party people ", COMMENT, " hey party people ") test(r" # hey party people ", WHITESPACE, COMMENT, " hey party people ") test(r""" "quoted \\u1234 string" """, WHITESPACE, STRING, "quoted \u1234 string", WHITESPACE) test(r""" "quoted \\N{END OF LINE} string" """, WHITESPACE, STRING, "quoted \n string", WHITESPACE) test(r""" "quoted string" = value """, WHITESPACE, STRING, "quoted string", WHITESPACE, EQUALS, WHITESPACE, STRING, "value", WHITESPACE) test(r""" "quoted string"=value """, WHITESPACE, STRING, "quoted string", EQUALS, STRING, "value", WHITESPACE) test(r""" "quoted string"={""", WHITESPACE, STRING, "quoted string", EQUALS, LEFT_CURLY_BRACE) test(r""" "quoted string" = {""", WHITESPACE, STRING, "quoted string", WHITESPACE, EQUALS, WHITESPACE, LEFT_CURLY_BRACE) test(r""" "quoted string"=[""", WHITESPACE, STRING, "quoted string", EQUALS, LEFT_SQUARE_BRACKET) test(r""" "quoted string" = [""", WHITESPACE, STRING, "quoted string", WHITESPACE, EQUALS, WHITESPACE, LEFT_SQUARE_BRACKET) test(r"x=y", STRING, "x", EQUALS, STRING, "y") test(r"x={", STRING, "x", EQUALS, LEFT_CURLY_BRACE) test(r"x=[", STRING, "x", EQUALS, LEFT_SQUARE_BRACKET) test(r'''x="quoted string"''', STRING, "x", EQUALS, STRING, "quoted string") # and now, the big finish test(r""" 'quoted string' "quoted string 2" [ { = "quoted value" [ { ] } = "yes!" [{}] ''' """, WHITESPACE, STRING, "quoted string", WHITESPACE, STRING, "quoted string 2", WHITESPACE, LEFT_SQUARE_BRACKET, WHITESPACE, LEFT_CURLY_BRACE, WHITESPACE, EQUALS, WHITESPACE, STRING, "quoted value", WHITESPACE, LEFT_SQUARE_BRACKET, WHITESPACE, LEFT_CURLY_BRACE, WHITESPACE, RIGHT_SQUARE_BRACKET, WHITESPACE, RIGHT_CURLY_BRACE, WHITESPACE, EQUALS, WHITESPACE, STRING, "yes!", WHITESPACE, LEFT_SQUARE_BRACKET, LEFT_CURLY_BRACE, RIGHT_CURLY_BRACE, RIGHT_SQUARE_BRACKET, WHITESPACE, TRIPLE_SINGLE_QUOTE, WHITESPACE, ) PKLU۩$ $ perky/utility.py#!/usr/bin/env python3 # # Part of the "perky" Python library # Copyright 2018 by Larry Hastings class RecursiveChainMap(dict): def __init__(self, *dicts): self.cache = {} self.maps = [self.cache] self.maps.extend(dicts) self.deletes = set() def __missing__(self, key): raise KeyError(key) def __getitem__(self, key): if key in self.deletes: raise self.__missing__(key) submaps = [] for map in self.maps: try: # "key in dict" doesn't work with defaultdict! value = map[key] if isinstance(value, dict): submaps.append(value) elif not submaps: return value except KeyError: continue if not submaps: raise self.__missing__(key) value = RecursiveChainMap(*submaps) self.cache[key] = value return value def __setitem__(self, key, value): self.cache[key] = value self.deletes.discard(key) def __delitem__(self, key, value): if key in self.deletes: self.__missing__(key) self.deletes.add(key) __sentinel = object() def get(self, key, default=__sentinel): if key in self: return key[self] if default is not __sentinel: return default raise self.__missing__(key) def __len__(self): return len(set().union(*self.maps) - self.deletes) def __iter__(self): return iter(set().union(*self.maps) - self.deletes) def __contains__(self, key): if key in self.deletes: return False return any(key in map for map in self.maps) def __bool__(self): if not self.deletes: return any(self.maps) for map in self.maps: keys = set(map) - self.deletes if keys: return True def _merge_dicts(rcm): d = {} for key in rcm: value = rcm[key] if isinstance(value, RecursiveChainMap): value = _merge_dicts(value) d[key] = value return d def merge_dicts(*dicts): rcm = RecursiveChainMap(*dicts) return _merge_dicts(rcm) if __name__ == "__main__": dict1 = {'a': 1, 'sub': {1: 2, 3:4, 5:6}} dict2 = {'b': 2, 'sub': {2: 3, 4:5, 6:7}} rcm = RecursiveChainMap(dict1, dict2) print(rcm['a']) print(rcm['b']) sub = rcm['sub'] print([(name, sub[name]) for name in range(1, 7)]) d = merge_dicts(dict1, dict2) print(d) PK!HNOperky-0.1.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,zd&Y)r$[)T&UrPK!H 7{NL perky-0.1.dist-info/METADATAVQo6~88I i([ h.{(D$()vڧ/}w*B}Vkkf|L\ZͨQnSlܴu-vFuTjB۰FTJQi]-Ô.H\)sESX)*kW!4~vza.O+v%}fOcSAx+q"wyZjF/uKQ}iS>KgP>w lBi `PtׅR+dBtZBݫ?Λ{~ȵK',[$p~:L⨫8:7*-b!մ*(8Xs[Se$ݯǏfrg/i}hH{qI{t 9|VlPUvDSJ%Pݪߚ TY&;Zo7+@b Q_ZDS25Wt/Vl:Y }jӧַWi1< \^ń饱N]TҬmC PțF.xEX}i0a%zWyH̝I,$sYU[BUkT Et2![4~܄6hw12kI(#_Jp$R-T`a%Z'3!FQcѠj5N{&T(E3¨Yj= c,@mX7$Ǟ`F:-%''W_:ﺸ{^/(`Xy:yYYɧUȟDoQTj#^ e@Bnn0(CAY[/0ԵiڐP^0VZzÒ]*ȯdQJPգNhGw_ǏHⲤmi#q6}=^ggX "?}K&b v"'|4 K!d?O({M& u]1H'|[W6adAnI{8Ǘ"fpFGS^mYSuصv&pc`lJА9ꔎ (fj%1j7" s7 8P k} I @Q-.zqV:Ǒ K'ַq'OqUt1UbڪJ&*ne`G>/B׾Obq'PHEFF3ǭ>hNA#ƒ3΀TEsq|5>YPK!H)perky-0.1.dist-info/RECORDmr0{%(ak8@ "H-° F;x#|紧,g#箐Th$Rש85Q˯`‹9tD/"TbÞU ![i!!p̢w|H,iuյc3P lkm#| &!+?SNJy@Ru/7ciYmt]R(ʄy!$vC@m*LbA<?J H[u||]xve8b7)ȩ!h"솁$Kz bPKȻL2''perky/__init__.pyPKL[(P,,'perky/tokenize.pyPKLU۩$ $ Tperky/utility.pyPK!HNOm^perky-0.1.dist-info/WHEELPK!H 7{NL ^perky-0.1.dist-info/METADATAPK!H)zdperky-0.1.dist-info/RECORDPKe