PK.Fopenfisca_parsers/__init__.pyPKW[F Z#Z#*openfisca_parsers/formulas_parsers_2to3.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Parsers for formula-specific lib2to3-based abstract syntax trees""" from __future__ import division import collections import inspect import itertools import lib2to3.pgen2.token import lib2to3.pygram import lib2to3.pytree import os import textwrap import numpy as np from openfisca_core import conv symbols = lib2to3.pygram.python_symbols # Note: symbols is a module. tokens = lib2to3.pgen2.token # Note: tokens is a module. type_symbol = lib2to3.pytree.type_repr # Note: type_symbol is a function. # Monkey patches to support utf-8 strings lib2to3.pytree.Base.__str__ = lambda self: unicode(self).encode('utf-8') lib2to3.pytree.Leaf.__unicode__ = lambda self: self.prefix.decode('utf-8') + (self.value.decode('utf-8') if isinstance(self.value, str) else unicode(self.value) ) # Abstract Wrappers class AbstractWrapper(object): container = None # The wrapper directly containing this wrapper hint = None # A wrapper that is the hinted type of this wrapper node = None # The lib2to3 node parser = None def __init__(self, container = None, hint = None, node = None, parser = None): if container is not None: assert isinstance(container, AbstractWrapper), "Invalid container {} for node:\n{}\n\n{}".format(container, repr(node), unicode(node).encode('utf-8')) self.container = container if hint is not None: assert isinstance(hint, AbstractWrapper), "Invalid hint {} for node:\n{}\n\n{}".format(hint, repr(node), unicode(node).encode('utf-8')) self.hint = hint if node is not None: assert isinstance(node, lib2to3.pytree.Base), "Invalid node:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) self.node = node assert isinstance(parser, Parser), "Invalid parser {} for node:\n{}\n\n{}".format(parser, repr(node), unicode(node).encode('utf-8')) self.parser = parser @property def containing_class(self): container = self.container if container is None: return None return container.containing_class @property def containing_function(self): container = self.container if container is None: return None return container.containing_function @property def containing_module(self): container = self.container if container is None: return None return container.containing_module def guess(self, expected): assert issubclass(expected, AbstractWrapper) if isinstance(self, expected): return self if self.hint is not None: guessed = self.hint.guess(expected) if guessed is not None: return guessed return None # Level-1 Wrappers class AndExpression(AbstractWrapper): operands = None operator = None def __init__(self, container = None, hint = None, node = None, operands = None, operator = None, parser = None): super(AndExpression, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(operands, list) self.operands = operands assert isinstance(operator, basestring) self.operator = operator def guess(self, expected): guessed = super(AndExpression, self).guess(expected) if guessed is not None: return guessed parser = self.parser if issubclass(parser.Array, expected): for operand in self.operands: array = operand.guess(parser.Array) if array is not None: return parser.Array( cell = parser.Boolean( parser = parser, ), entity_class = array.entity_class, parser = parser, ) return None @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.and_expr, "Unexpected and expression type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) >= 3 and (len(children) & 1), \ "Unexpected length {} of children in and expression:\n{}\n\n{}".format(len(children), repr(node), unicode(node).encode('utf-8')) child_index = 0 operands = [] operator_symbol = None while child_index < len(children): child = children[child_index] operands.append(parser.parse_value(child, container = container)) child_index += 1 if child_index >= len(children): break operator = children[child_index] assert operator.type == tokens.AMPER, "Unexpected operator type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) if operator_symbol is None: operator_symbol = operator.value else: assert operator_symbol == operator.value, "Unexpected operator:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) child_index += 1 return cls(container = container, node = node, parser = parser, operands = operands, operator = operator_symbol) class AndTest(AbstractWrapper): operands = None operator = None def __init__(self, container = None, hint = None, node = None, operands = None, operator = None, parser = None): super(AndTest, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(operands, list) self.operands = operands assert isinstance(operator, basestring) self.operator = operator def guess(self, expected): guessed = super(AndTest, self).guess(expected) if guessed is not None: return guessed parser = self.parser if issubclass(parser.Boolean, expected): return parser.Boolean( parser = parser, ) return None @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.and_test, "Unexpected and test type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) >= 3 and (len(children) & 1), \ "Unexpected length {} of children in and expression:\n{}\n\n{}".format(len(children), repr(node), unicode(node).encode('utf-8')) child_index = 0 operands = [] operator_symbol = None while child_index < len(children): child = children[child_index] operands.append(parser.parse_value(child, container = container)) child_index += 1 if child_index >= len(children): break operator = children[child_index] assert operator.type == tokens.NAME and operator.value == u'and', \ "Unexpected operator type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) if operator_symbol is None: operator_symbol = operator.value else: assert operator_symbol == operator.value, "Unexpected operator:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) child_index += 1 return cls(container = container, node = node, parser = parser, operands = operands, operator = operator_symbol) class ArithmeticExpression(AbstractWrapper): items = None def __init__(self, container = None, hint = None, items = None, node = None, parser = None): super(ArithmeticExpression, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(items, list) assert len(items) >= 3 and (len(items) & 1) self.items = items def guess(self, expected): guessed = super(ArithmeticExpression, self).guess(expected) if guessed is not None: return guessed parser = self.parser if issubclass(parser.Array, expected): for index, item in enumerate(self.items): if index & 1: continue array = item.guess(parser.Array) if array is not None: return parser.Array( cell = parser.Number( parser = parser, ), entity_class = array.entity_class, parser = parser, ) return None @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.arith_expr, "Unexpected arithmetic expression type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) >= 3 and (len(children) & 1), \ "Unexpected length {} of children in arithmetic expression:\n{}\n\n{}".format(len(children), repr(node), unicode(node).encode('utf-8')) child_index = 0 items = [] while child_index < len(children): child = children[child_index] items.append(parser.parse_value(child, container = container)) child_index += 1 if child_index >= len(children): break operator = children[child_index] assert operator.type in (tokens.MINUS, tokens.PLUS), "Unexpected operator type:\n{}\n\n{}".format( repr(node), unicode(node).encode('utf-8')) items.append(operator.value) child_index += 1 return cls(container = container, items = items, node = node, parser = parser) class Array(AbstractWrapper): """Wrapper for a NumPy array""" cell = None # Cell wrapper entity_class = None value = None # array value, as a numpy array def __init__(self, cell = None, container = None, entity_class = None, hint = None, node = None, parser = None, value = None): super(Array, self).__init__(container = container, hint = hint, node = node, parser = parser) if cell is not None: assert isinstance(cell, AbstractWrapper), "Unexpected value for cell: {} of type {}".format(cell, type(cell)) self.cell = cell if entity_class is not None: assert entity_class in parser.tax_benefit_system.entity_class_by_key_plural.itervalues(), \ "Unexpected value for entity class: {} of type {}".format(entity_class, type(entity_class)) self.entity_class = entity_class if value is not None: assert isinstance(value, np.ndarray), "Unexpected value for array: {} of type {}".format(value, type(value)) self.value = value # class ArrayLength(AbstractWrapper): # array = None # # def __init__(self, node, array = None, parser = None): # super(ArrayLength, self).__init__(node, parser = parser) # if array is not None: # self.array = array class Assert(AbstractWrapper): error = None test = None def __init__(self, container = None, error = None, hint = None, node = None, parser = None, test = None): super(Assert, self).__init__(container = container, hint = hint, node = node, parser = parser) if error is not None: assert isinstance(error, AbstractWrapper) self.error = error assert isinstance(test, AbstractWrapper) self.test = test @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.assert_stmt, "Unexpected assert type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) >= 2, "Unexpected length {} of children in assert:\n{}\n\n{}".format( len(children), repr(node), unicode(node).encode('utf-8')) assert_word = children[0] assert assert_word.type == tokens.NAME and assert_word.value == 'assert' test = parser.parse_value(children[1], container = container) # TODO # error = parser.parse_value(error, container = container) error = None return cls(container = container, error = error, node = node, parser = parser, test= test) class Assignment(AbstractWrapper): left = None operator = None right = None def __init__(self, container = None, hint = None, left = None, node = None, operator = None, parser = None, right = None): super(Assignment, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(left, list) self.left = left assert isinstance(operator, basestring) self.operator = operator assert isinstance(right, list) self.right = right if len(left) == len(right) and operator == '=': for left_item, right_item in itertools.izip(left, right): if isinstance(left_item, parser.Variable): assert left_item is not right_item left_item.value = right_item @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.expr_stmt, "Unexpected assignement type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) == 3, "Unexpected length {} of children in assignment:\n{}\n\n{}".format( len(children), repr(node), unicode(node).encode('utf-8')) left, operator, right = children assert operator.type in (tokens.AMPEREQUAL, tokens.EQUAL, tokens.MINEQUAL, tokens.PLUSEQUAL, tokens.STAREQUAL), "Unexpected assignment operator:\n{}\n\n{}".format(repr(node), unicode(node).encode( 'utf-8')) # Right items must be parsed before left ones, to avoid reuse of left variables (for example in statements like: # period = period). right_items = [] if right.type == symbols.testlist_star_expr: assert operator.type == tokens.EQUAL right_children = right.children child_index = 0 while child_index < len(right_children): right_child = right_children[child_index] right_items.append(parser.parse_value(right_child, container = container)) child_index += 1 if child_index >= len(right_children): break assert right_children[child_index].type == tokens.COMMA child_index += 1 else: right_items.append(parser.parse_value(right, container = container)) left_items = [] if left.type == symbols.testlist_star_expr: assert operator.type == tokens.EQUAL left_children = left.children child_index = 0 while child_index < len(left_children): left_child = left_children[child_index] assert left_child.type == tokens.NAME variable = parser.Variable.parse(left_child, container = container, parser = parser) left_items.append(variable) container.variable_by_name[variable.name] = variable child_index += 1 if child_index >= len(left_children): break assert left_children[child_index].type == tokens.COMMA child_index += 1 elif left.type == symbols.power: left_items.append(parser.parse_power(left, container = container)) else: assert left.type == tokens.NAME, \ "Unexpected assignment left operand:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) variable = parser.Variable.parse(left, container = container, parser = parser, value = None if operator.type == tokens.EQUAL else container.get_variable(left.value, parser = parser)) left_items.append(variable) container.variable_by_name[variable.name] = variable return cls(container = container, left = left_items, node = node, operator = operator.value, parser = parser, right = right_items) class Attribute(AbstractWrapper): name = None subject = None def __init__(self, container = None, hint = None, name = None, node = None, parser = None, subject = None): super(Attribute, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(name, basestring) self.name = name assert isinstance(subject, AbstractWrapper) self.subject = subject def guess(self, expected): guessed = super(Attribute, self).guess(expected) if guessed is not None: return guessed parser = self.parser if issubclass(parser.Boolean, expected): compact_node_wrapper = self.subject.guess(parser.CompactNode) if compact_node_wrapper is not None: child_json = compact_node_wrapper.value['children'][self.name] child_type = child_json['@type'] if child_type == u'Parameter' and child_json.get('format') == 'boolean': return parser.Boolean(parser = parser) elif issubclass(parser.CompactNode, expected): compact_node = self.subject.guess(parser.CompactNode) if compact_node is not None: child_json = compact_node.value['children'].get(self.name) if child_json is not None: child_type = child_json['@type'] if child_type == u'Node': return parser.CompactNode(is_reference = compact_node.is_reference, name = self.name, parent = compact_node, parser = parser, value = child_json) elif issubclass(parser.Date, expected): if self.name == 'date': period = self.subject.guess(parser.Period) if period is not None: return parser.Date(parser = parser) elif issubclass(parser.Entity, expected): if self.name == 'entity': holder = self.subject.guess(parser.Holder) if holder is not None: entity_class = parser.tax_benefit_system.entity_class_by_key_plural[holder.column.entity_key_plural] return parser.Entity(entity_class = entity_class, parser = parser) elif issubclass(parser.FormulaClass, expected): if self.name == '__class__': formula = self.subject.guess(parser.Formula) if formula is not None: return formula.formula_class elif issubclass(parser.Holder, expected): if self.name == 'holder': formula = self.subject.guess(parser.Formula) if formula is not None: return parser.Holder(column = formula.column, parser = parser) elif issubclass(parser.Instant, expected): if self.name == 'start': period = self.subject.guess(parser.Period) if period is not None: return parser.Instant(parser = parser) elif issubclass(parser.Number, expected): if self.name == 'count': entity = self.subject.guest(parser.Entity) if entity is not None: return parser.Number(parser = parser) compact_node_wrapper = self.subject.guess(parser.CompactNode) if compact_node_wrapper is not None: child_json = compact_node_wrapper.value['children'][self.name] child_type = child_json['@type'] if child_type == u'Parameter' and child_json.get('format') != 'boolean': return parser.Number(parser = parser) elif issubclass(parser.String, expected): if self.name == '__name__': formula_class = self.subject.guess(parser.FormulaClass) if formula_class is not None: return parser.String(parser = parser, value = parser.column.name) elif issubclass(parser.TaxScale, expected): compact_node_wrapper = self.subject.guess(parser.CompactNode) if compact_node_wrapper is not None: child_json = compact_node_wrapper.value['children'][self.name] child_type = child_json['@type'] if child_type == u'Scale': return parser.TaxScale(parser = parser) elif issubclass(parser.UniformDictionary, expected): if self.name == '_array_by_period': holder = self.subject.guess(parser.Holder) if holder is not None: column = holder.column entity_class = parser.tax_benefit_system.entity_class_by_key_plural[column.entity_key_plural] cell_wrapper = parser.get_cell_wrapper(container = self.container, type = column.dtype) return parser.UniformDictionary( key = parser.Period( parser = parser, ), parser = parser, value = parser.Array( cell = cell_wrapper, entity_class = parser.entity_class, parser = parser, ), ) return None @classmethod def parse(cls, subject, node, container = None, parser = None): assert node.type == symbols.trailer, "Unexpected attribute type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) == 2, "Unexpected length {} of children in power attribute:\n{}\n\n{}".format( len(children), repr(node), unicode(node).encode('utf-8')) dot, attribute = children assert dot.type == tokens.DOT, "Unexpected dot type:\n{}\n\n{}".format(repr(dot), unicode(dot).encode('utf-8')) assert attribute.type == tokens.NAME, "Unexpected attribute type:\n{}\n\n{}".format(repr(attribute), unicode(attribute).encode('utf-8')) return cls(container = container, name = attribute.value, node = node, parser = parser, subject = subject) class Call(AbstractWrapper): keyword_argument = None named_arguments = None positional_arguments = None star_argument = None subject = None def __init__(self, container = None, hint = None, keyword_argument = None, named_arguments = None, node = None, parser = None, positional_arguments = None, star_argument = None, subject = None): super(Call, self).__init__(container = container, hint = hint, node = node, parser = parser) if keyword_argument is not None: assert isinstance(keyword_argument, AbstractWrapper) self.keyword_argument = keyword_argument if named_arguments is None: named_arguments = collections.OrderedDict() else: assert isinstance(named_arguments, collections.OrderedDict) self.named_arguments = named_arguments if positional_arguments is None: positional_arguments = [] else: assert isinstance(positional_arguments, list) self.positional_arguments = positional_arguments if star_argument is not None: assert isinstance(star_argument, AbstractWrapper) self.star_argument = star_argument assert isinstance(subject, AbstractWrapper) self.subject = subject function = subject.guess(parser.Function) if function is not None: function.parse_call(self) def guess(self, expected): guessed = super(Call, self).guess(expected) if guessed is not None: return guessed parser = self.parser function = self.subject.guess(parser.Function) if function is not None: if issubclass(parser.Array, expected): if function.name in (u'age_aine', u'age_en_mois_benjamin', u'nb_enf'): return parser.Array( cell = parser.Number( parser = parser, ), parser = parser, ) assert function.returns, "Function {} has no return statement".format(function.name) return function.returns[-1].guess(expected) if issubclass(parser.Array, expected): function = self.subject.guess(parser.Variable) if function is not None: if function.name in (u'and_', u'or_', u'xor_'): for argument in self.positional_arguments: array = argument.guess(parser.Array) if array is not None: return parser.Array( cell = parser.Boolean( parser = parser, ), entity_class = array.entity_class, parser = parser, ) elif function.name in (u'floor', u'max_', u'min_', u'round_'): for argument in self.positional_arguments: array = argument.guess(parser.Array) if array is not None: return parser.Array( cell = parser.Number( parser = parser, ), entity_class = array.entity_class, parser = parser, ) elif function.name == 'not_': assert len(self.positional_arguments) == 1 argument = self.positional_arguments[0] array = argument.guess(parser.Array) if array is not None: return parser.Array( cell = parser.Boolean( parser = parser, ), entity_class = array.entity_class, parser = parser, ) else: method = self.subject.guess(parser.Attribute) if method is not None: if method.name in ('all', 'any'): return parser.Boolean( parser = parser, ) if method.name in ('any_by_roles', 'sum_by_entity'): assert len(self.positional_arguments) == 1, self.positional_arguments assert len(self.named_arguments) == 0, self.named_arguments variable = self.positional_arguments[0].guess(parser.Variable) if variable is None: cell_wrapper = None else: variable_name = variable.name if variable_name.endswith(u'_holder'): variable_name = variable_name[:-len(u'_holder')] tax_benefit_system = parser.tax_benefit_system column = tax_benefit_system.column_by_name[variable_name] cell_wrapper = parser.get_cell_wrapper(container = self.container, type = column.dtype) return parser.Array( cell = cell_wrapper, entity_class = parser.entity_class, parser = parser, ) if method.name in ('calculate', 'calculate_add', 'calculate_add_divide', 'calculate_divide', 'get_array'): assert len(self.positional_arguments) >= 1 variable_name_wrapper = self.positional_arguments[0].guess(parser.String) if variable_name_wrapper is None: cell_wrapper = None entity_class = None else: tax_benefit_system = parser.tax_benefit_system column = tax_benefit_system.column_by_name[variable_name_wrapper.value] cell_wrapper = parser.get_cell_wrapper(container = self.container, type = column.dtype) entity_class = tax_benefit_system.entity_class_by_key_plural[column.entity_key_plural] return parser.Array( cell = cell_wrapper, entity_class = entity_class, parser = parser, ) if method.name in ('cast_from_entity_to_role', 'cast_from_entity_to_roles'): assert len(self.positional_arguments) >= 1 variable = self.positional_arguments[0].guess(parser.Variable) if variable is None: cell_wrapper = None else: variable_name = variable.name if variable_name.endswith(u'_holder'): variable_name = variable_name[:-len(u'_holder')] tax_benefit_system = parser.tax_benefit_system column = tax_benefit_system.column_by_name[variable_name] cell_wrapper = parser.get_cell_wrapper(container = self.container, type = column.dtype) return parser.Array( cell = cell_wrapper, entity_class = parser.person_class, parser = parser, ) if method.name == 'filter_role': assert len(self.positional_arguments) >= 1 variable = self.positional_arguments[0].guess(parser.Variable) if variable is None: cell_wrapper = None else: variable_name = variable.name if variable_name.endswith(u'_holder'): variable_name = variable_name[:-len(u'_holder')] tax_benefit_system = parser.tax_benefit_system column = tax_benefit_system.column_by_name[variable_name] cell_wrapper = parser.get_cell_wrapper(container = self.container, type = column.dtype) return parser.Array( cell = cell_wrapper, entity_class = parser.entity_class, parser = parser, ) elif issubclass(parser.Boolean, expected): function = self.subject.guess(parser.Variable) if function is not None: if function.name == 'hasattr': return parser.Boolean(parser = parser) elif issubclass(parser.CompactNode, expected): method = self.subject.guess(parser.Attribute) if method is not None: if method.name == 'legislation_at': method_subject = method.subject if method_subject.guess(parser.Simulation): positional_arguments = self.positional_arguments assert len(positional_arguments) == 1, positional_arguments instant = positional_arguments[0].guess(parser.Instant) if instant is not None: named_arguments = self.named_arguments assert len(named_arguments) <= 1, named_arguments reference = named_arguments.get('reference') assert reference is None return parser.CompactNode( is_reference = bool(reference), parser = parser, value = parser.tax_benefit_system.legislation_json, ) elif issubclass(parser.Date, expected): function = self.subject.guess(parser.Variable) if function is not None: if function.name == 'date': return parser.Date(parser = parser) elif issubclass(parser.DatedHolder, expected): method = self.subject.guess(parser.Attribute) if method is not None: if method.name in ('compute', 'compute_add', 'compute_add_divide', 'compute_divide'): assert len(self.positional_arguments) >= 1 variable_name_wrapper = self.positional_arguments[0].guess(parser.String) if variable_name_wrapper is None: column = None else: column = parser.tax_benefit_system.column_by_name[variable_name_wrapper.value] return parser.DatedHolder( column = column, parser = parser, ) elif issubclass(parser.Instant, expected): method = self.subject.guess(parser.Attribute) if method is not None: if method.name == 'offset': if method.subject.guess(parser.Instant) is not None: return parser.Instant(parser = parser) elif issubclass(parser.Number, expected): function = self.subject.guess(parser.Variable) if function is not None: if function.name == 'len': return parser.Number(parser = parser) elif issubclass(parser.Period, expected): method = self.subject.guess(parser.Attribute) if method is not None: if method.name == 'offset': period = method.subject.guess(parser.Period) if period is not None: # period.offset(...) return parser.Period(parser = parser, unit = period.unit) elif method.name == 'period': if method.subject.guess(parser.Instant): # instant.period(...) assert len(self.positional_arguments) >= 1 unit = self.positional_arguments[0].guess(parser.String) if unit is not None: assert unit.value is not None, 'Missing value in unit string' return parser.Period(parser = parser, unit = unit.value) elif issubclass(parser.TaxScale, expected): method = self.subject.guess(parser.Attribute) if method is not None: if method.name == 'calc': if method.subject.guess(parser.Instant) is not None: return parser.TaxScale(parser = parser) elif issubclass(parser.UniformDictionary, expected): method = self.subject.guess(parser.Attribute) if method is not None: if method.name == 'split_by_roles': assert len(self.positional_arguments) == 1, self.positional_arguments assert len(self.named_arguments) <= 1, self.named_arguments variable = self.positional_arguments[0].guess(parser.Variable) if variable is None: cell_wrapper = None else: variable_name = variable.name if variable_name.endswith(u'_holder'): variable_name = variable_name[:-len(u'_holder')] tax_benefit_system = parser.tax_benefit_system column = tax_benefit_system.column_by_name[variable_name] cell_wrapper = parser.get_cell_wrapper(container = self.container, type = column.dtype) return parser.UniformDictionary( key = parser.Role( parser = parser, ), parser = parser, value = parser.Array( cell = cell_wrapper, entity_class = parser.entity_class, parser = parser, ), ) elif issubclass(parser.UniformIterator, expected): function = self.subject.guess(parser.Variable) if function is not None: if function.name == 'sorted': assert len(self.positional_arguments) >= 1 argument = self.positional_arguments[0] uniform_iterator = argument.guess(expected) if uniform_iterator is not None: return uniform_iterator uniform_dictionary = argument.guess(parser.UniformDictionary) if uniform_dictionary is not None: return uniform_dictionary.guess(expected) else: method = self.subject.guess(parser.Attribute) if method is not None: if method.name == 'iterkeys': uniform_dictionary = method.subject.guess(parser.UniformDictionary) if uniform_dictionary is not None: return parser.UniformIterator( items = [uniform_dictionary.key], parser = parser, ) elif method.name == 'iteritems': uniform_dictionary = method.subject.guess(parser.UniformDictionary) if uniform_dictionary is not None: return parser.UniformIterator( items = [ uniform_dictionary.key, uniform_dictionary.value, ], parser = parser, ) elif method.name == 'itervalues': uniform_dictionary = method.subject.guess(parser.UniformDictionary) if uniform_dictionary is not None: return parser.UniformIterator( items = [uniform_dictionary.value], parser = parser, ) return None @classmethod def parse(cls, subject, node, container = None, parser = None): if node is None: children = [] elif node.type == symbols.arglist: children = node.children else: children = [node] keyword_argument = None named_arguments = collections.OrderedDict() positional_arguments = [] star_argument = None child_index = 0 while child_index < len(children): argument = children[child_index] if argument.type == symbols.argument: # Named argument argument_children = argument.children assert len(argument_children) == 3, "Unexpected length {} of children in argument:\n{}\n\n{}".format( len(argument_children), repr(argument), unicode(argument).encode('utf-8')) argument_name, equal, argument_value = argument_children assert argument_name.type == tokens.NAME, "Unexpected name type:\n{}\n\n{}".format(repr(argument_name), unicode(argument_name).encode('utf-8')) assert equal.type == tokens.EQUAL, "Unexpected equal type:\n{}\n\n{}".format(repr(equal), unicode(equal).encode('utf-8')) named_arguments[argument_name.value] = parser.parse_value(argument_value, container = container) else: # Positional argument if argument.type == tokens.STAR: child_index += 1 argument = children[child_index] if argument.type == tokens.STAR: child_index += 1 argument = children[child_index] keyword_argument = parser.parse_value(argument, container = container) else: star_argument = parser.parse_value(argument, container = container) else: positional_arguments.append(parser.parse_value(argument, container = container)) child_index += 1 if child_index >= len(children): break child = children[child_index] assert child.type == tokens.COMMA, "Unexpected comma type:\n{}\n\n{}".format(repr(child), unicode(child).encode('utf-8')) child_index += 1 return cls(container = container, keyword_argument = keyword_argument, named_arguments = named_arguments, node = node, parser = parser, positional_arguments = positional_arguments, star_argument = star_argument, subject = subject) class Class(AbstractWrapper): base_class_name = None name = None variable_by_name = None def __init__(self, base_class_name = None, container = None, name = None, node = None, parser = None, variable_by_name = None): super(Class, self).__init__(container = container, node = node, parser = parser) assert isinstance(base_class_name, basestring) self.base_class_name = base_class_name assert isinstance(name, basestring) self.name = name if variable_by_name is None: variable_by_name = collections.OrderedDict() else: assert isinstance(variable_by_name, collections.OrderedDict) self.variable_by_name = variable_by_name @property def containing_class(self): return self @classmethod def get_function_class(cls, parser = None): return parser.Function def get_variable(self, name, default = UnboundLocalError, parser = None): variable = self.variable_by_name.get(name, None) if variable is None: container = self.container if container is not None: return container.get_variable(name, default = default, parser = parser) # TODO: Handle class inheritance. if default is UnboundLocalError: raise KeyError("Undefined value for {}".format(name)) variable = default return variable @classmethod def parse(cls, node, container = None, parser = None): try: children = node.children assert len(children) == 7, len(children) assert children[0].type == tokens.NAME and children[0].value == 'class' assert children[1].type == tokens.NAME name = children[1].value assert children[2].type == tokens.LPAR and children[2].value == '(' assert children[3].type == tokens.NAME base_class_name = children[3].value assert children[4].type == tokens.RPAR and children[4].value == ')' assert children[5].type == tokens.COLON and children[5].value == ':' variable_by_name = collections.OrderedDict() self = cls(base_class_name = base_class_name, container = container, name = name, node = node, parser = parser, variable_by_name = variable_by_name) suite = children[6] assert suite.type == symbols.suite suite_children = suite.children assert len(suite_children) > 2, len(suite_children) assert suite_children[0].type == tokens.NEWLINE and suite_children[0].value == '\n' assert suite_children[1].type == tokens.INDENT and suite_children[1].value == ' ' for suite_child in itertools.islice(suite_children, 2, None): if suite_child.type == symbols.decorated: decorator = parser.Decorator.parse(suite_child, container = self, parser = parser) variable_by_name[decorator.decorated.name] = parser.Variable(container = self, name = decorator.name, parser = parser, value = decorator) elif suite_child.type == symbols.funcdef: function = cls.get_function_class(parser = parser).parse(suite_child, container = self, parser = parser) variable_by_name[function.name] = parser.Variable(container = self, name = function.name, parser = parser, value = function) elif suite_child.type == symbols.simple_stmt: assert len(suite_child.children) == 2, len(suite_child.children) expression = suite_child.children[0] assert expression.type in (symbols.expr_stmt, tokens.STRING), expression.type assert suite_child.children[1].type == tokens.NEWLINE and suite_child.children[1].value == '\n' elif suite_child.type == tokens.DEDENT: continue else: assert False, "Unexpected statement in class definition:\n{}\n\n{}".format(repr(suite_child), unicode(suite_child).encode('utf-8')) return self except: if node is not None: print "An exception occurred in node:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) raise class ClassFileInput(AbstractWrapper): @classmethod def get_class_class(cls, parser = None): return parser.Class @classmethod def parse(cls, class_definition, parser = None): source_lines, line_number = inspect.getsourcelines(class_definition) source = textwrap.dedent(''.join(source_lines)) node = parser.driver.parse_string(source) assert node.type == symbols.file_input, "Unexpected file input type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) == 2 and children[0].type == symbols.classdef and children[1].type == tokens.ENDMARKER, \ "Unexpected node children in:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) python_module = inspect.getmodule(class_definition) if parser.country_package is not None: assert python_module.__file__.startswith(os.path.dirname(parser.country_package.__file__)), \ "Requested class is defined outside country_package:\n{}".format(source) module = parser.python_module_by_name.get(python_module.__name__) if module is None: parser.python_module_by_name[python_module.__name__] = module = parser.Module(node, python = python_module, parser = parser) self = cls(parser = parser) class_definition_class = self.get_class_class(parser = parser) try: return class_definition_class.parse(children[0], container = module, parser = parser) except: if node is not None: print "An exception occurred in node:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) raise class CompactNode(AbstractWrapper): is_reference = True name = None parent = None # Parent Compact Node wrapper value = None # Law node JSON def __init__(self, is_reference = False, name = None, parent = None, parser = None, value = None): super(CompactNode, self).__init__(parser = parser) if not is_reference: self.is_reference = False assert (parent is None) == (name is None), str((name, parent)) if name is not None: self.name = name if parent is not None: assert isinstance(parent, CompactNode) self.parent = parent if value is not None: assert isinstance(value, dict) self.value = value def iter_names(self): parent = self.parent if parent is not None: for ancestor_name in parent.iter_names(): yield ancestor_name name = self.name if name is not None: yield name @property def path(self): return '.'.join(self.iter_names()) class Comparison(AbstractWrapper): left = None operator = None right = None def __init__(self, container = None, hint = None, left = None, node = None, operator = None, parser = None, right = None): super(Comparison, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(left, AbstractWrapper) self.left = left assert isinstance(operator, basestring) self.operator = operator assert isinstance(right, AbstractWrapper) self.right = right def guess(self, expected): guessed = super(Comparison, self).guess(expected) if guessed is not None: return guessed parser = self.parser if issubclass(parser.Array, expected): left_array = self.left.guess(parser.Array) if left_array is not None: return parser.Array( cell = parser.get_cell_wrapper(container = self.container, type = bool), entity_class = left_array.entity_class, parser = parser, ) left_dated_holder = self.left.guess(parser.DatedHolder) if left_dated_holder is not None: return parser.Array( cell = parser.get_cell_wrapper(container = self.container, type = bool), entity_class = left_dated_holder.entity_class, parser = parser, ) right_array = self.right.guess(parser.Array) if right_array is not None: return parser.Array( cell = parser.get_cell_wrapper(container = self.container, type = bool), entity_class = right_array.entity_class, parser = parser, ) right_dated_holder = self.right.guess(parser.DatedHolder) if right_dated_holder is not None: return parser.Array( cell = parser.get_cell_wrapper(container = self.container, type = bool), entity_class = right_dated_holder.entity_class, parser = parser, ) elif issubclass(parser.Boolean, expected): return parser.Boolean( parser = parser, ) return None @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.comparison, "Unexpected comparison type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) == 3, "Unexpected length {} of children in comparison:\n{}\n\n{}".format(len(children), repr(node), unicode(node).encode('utf-8')) left, operator, right = children left = parser.parse_value(left, container = container) if operator.type == tokens.NAME: assert operator.value in ('in', 'is'), "Unexpected operator type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) operator_symbol = operator.value elif operator.type == symbols.comp_op: operator_children = operator.children assert len(operator_children) == 2, "Unexpected length {} of children in comp_op:\n{}\n\n{}".format( len(operator_children), repr(node), unicode(node).encode('utf-8')) first_word, second_word = operator_children if first_word.type == tokens.NAME and first_word.value == 'is' and second_word.type == tokens.NAME \ and second_word.value == 'not': operator_symbol = 'is not' elif first_word.type == tokens.NAME and first_word.value == 'not' and second_word.type == tokens.NAME \ and second_word.value == 'in': operator_symbol = 'not in' else: assert False, "Unexpected comp_op children:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) else: assert operator.type in ( tokens.EQEQUAL, tokens.GREATER, tokens.GREATEREQUAL, tokens.LESS, tokens.LESSEQUAL, tokens.NOTEQUAL, ), "Unexpected operator type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) operator_symbol = operator.value right = parser.parse_value(right, container = container) return cls(container = container, left = left, node = node, parser = parser, operator = operator_symbol, right = right) class Continue(AbstractWrapper): pass class Date(AbstractWrapper): pass class DatedHolder(AbstractWrapper): column = None value = None # array value, as a numpy array def __init__(self, column = None, container = None, hint = None, node = None, parser = None, value = None): super(DatedHolder, self).__init__(container = container, hint = hint, node = node, parser = parser) if column is not None: assert column in parser.tax_benefit_system.column_by_name.itervalues(), \ "Unexpected value for column: {} of type {}".format(column, type(column)) self.column = column if value is not None: assert isinstance(value, np.ndarray), "Unexpected value for array: {} of type {}".format(value, type(value)) self.value = value @property def cell(self): if self.column is None: return None return self.parser.get_cell_wrapper(container = self.container, type = self.column.dtype) @property def entity_class(self): if self.column is None: return None return self.parser.tax_benefit_system.entity_class_by_key_plural[self.column.entity_key_plural] class DateTime64(AbstractWrapper): pass class Decorator(AbstractWrapper): decorated = None name = None subject = None # The decorator def __init__(self, container = None, decorated = None, name = None, node = None, parser = None, subject = None): super(Decorator, self).__init__(container = container, node = node, parser = parser) assert isinstance(decorated, AbstractWrapper) self.decorated = decorated assert isinstance(name, basestring) self.name = name assert isinstance(subject, AbstractWrapper) self.subject = subject @classmethod def parse(cls, node, container = None, parser = None): try: children = node.children assert len(children) == 2, len(children) decorator = children[0] assert decorator.type == symbols.decorator decorator_children = decorator.children assert len(decorator_children) == 6, len(decorator_children) assert decorator_children[0].type == tokens.AT and decorator_children[0].value == '@' subject = parser.Variable.parse(decorator_children[1], container = container, parser = parser) name = decorator_children[1].value assert decorator_children[2].type == tokens.LPAR and decorator_children[2].value == '(' subject = parser.Call.parse(subject, decorator_children[3], container = container, parser = parser) assert decorator_children[4].type == tokens.RPAR and decorator_children[4].value == ')' assert decorator_children[5].type == tokens.NEWLINE and decorator_children[5].value == '\n' subject = subject decorated = children[1] assert decorated.type == symbols.funcdef decorated = container.get_function_class(parser = parser).parse(decorated, container = container, parser = parser) return cls(container = container, decorated = decorated, name = name, node = node, parser = parser, subject = subject) except: if node is not None: print "An exception occurred in node:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) raise class Dictionary(AbstractWrapper): value = None # Dictionary value, as a dict def __init__(self, container = None, hint = None, node = None, parser = None, value = None): super(Dictionary, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(value, dict), "Unexpected value for dictionary: {} of type {}".format(value, type(value)) self.value = value class Enum(AbstractWrapper): value = None def __init__(self, container = None, node = None, parser = None, value = None): super(Enum, self).__init__(container = container, node = node, parser = parser) if value is not None: self.value = value def guess(self, expected): guessed = super(Enum, self).guess(expected) if guessed is not None: return guessed parser = self.parser if issubclass(parser.UniformIterator, expected): return parser.UniformIterator( items = [ # Item label parser.String( parser = parser, ), # Item index parser.Number( parser = parser, ), ], parser = parser, ) return None class Entity(AbstractWrapper): entity_class = None def __init__(self, container = None, entity_class = None, hint = None, node = None, parser = None): super(Entity, self).__init__(container = container, hint = hint, node = node, parser = parser) assert entity_class is not None self.entity_class = entity_class class Expression(AbstractWrapper): operands = None operator = None def __init__(self, container = None, hint = None, node = None, operands = None, operator = None, parser = None): super(Expression, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(operands, list) self.operands = operands assert isinstance(operator, basestring) self.operator = operator def guess(self, expected): guessed = super(Expression, self).guess(expected) if guessed is not None: return guessed parser = self.parser if issubclass(parser.Array, expected): for operand in self.operands: array = operand.guess(parser.Array) if array is not None: return parser.Array( cell = parser.Boolean( parser = parser, ), entity_class = array.entity_class, parser = parser, ) return None @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.expr, "Unexpected expression type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) >= 3 and (len(children) & 1), \ "Unexpected length {} of children in expression:\n{}\n\n{}".format(len(children), repr(node), unicode(node).encode('utf-8')) child_index = 0 operands = [] operator_symbol = None while child_index < len(children): child = children[child_index] operands.append(parser.parse_value(child, container = container)) child_index += 1 if child_index >= len(children): break operator = children[child_index] assert operator.type == tokens.VBAR, "Unexpected operator type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) if operator_symbol is None: operator_symbol = operator.value else: assert operator_symbol == operator.value, "Unexpected operator:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) child_index += 1 return cls(container = container, node = node, parser = parser, operands = operands, operator = operator_symbol) class Factor(AbstractWrapper): operand = None operator = None def __init__(self, container = None, hint = None, node = None, operand = None, operator = None, parser = None): super(Factor, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(operand, AbstractWrapper) self.operand = operand assert isinstance(operator, basestring) self.operator = operator @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.factor, "Unexpected factor type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) == 2, "Unexpected length {} of children in factor:\n{}\n\n{}".format(len(children), repr(node), unicode(node).encode('utf-8')) operator, operand = children assert operator.type in (tokens.MINUS, tokens.TILDE), "Unexpected operator type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) operand = parser.parse_value(operand, container = container) return cls(container = container, node = node, operand = operand, operator = operator.value, parser = parser) class For(AbstractWrapper): body = None iterator = None variable_by_name = None def __init__(self, container = None, hint = None, iterator = None, node = None, body = None, parser = None, variable_by_name = None): super(For, self).__init__(container = container, hint = hint, node = node, parser = parser) if body is not None: assert isinstance(body, list) self.body = body assert isinstance(iterator, AbstractWrapper) self.iterator = iterator guessed_iterator = iterator.guess(parser.UniformIterator) if guessed_iterator is None: if len(variable_by_name) == 1 and 'bar' in variable_by_name: guessed_iterator = parser.UniformIterator( container = container, items = [ parser.TaxScale( container = container, parser = parser, ), ], parser = parser, ) else: assert False, "{} has no iterator".format(iterator) assert isinstance(variable_by_name, collections.OrderedDict) for variable, value in itertools.izip(variable_by_name.itervalues(), guessed_iterator.items): variable.value = value self.variable_by_name = variable_by_name container.variable_by_name.update(variable_by_name) @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.for_stmt, "Unexpected for statement type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) == 6, "Unexpected length {} of children in for statement:\n{}\n\n{}".format( len(children), repr(node), unicode(node).encode('utf-8')) for_word, variables, in_word, iterator, colon, body = children assert for_word.type == tokens.NAME and for_word.value == 'for' variable_by_name = collections.OrderedDict() if variables.type == symbols.exprlist: variables = variables.children variable_index = 0 while variable_index < len(variables): variable = variables[variable_index] assert variable.type == tokens.NAME variable_name = variable.value variable_by_name[variable_name] = parser.Variable(container = container, name = variable_name, parser = parser) variable_index += 1 if variable_index >= len(variables): break comma = variables[variable_index] assert comma.type == tokens.COMMA, "Unexpected comma type:\n{}\n\n{}".format(repr(comma), unicode(comma).encode('utf-8')) variable_index += 1 elif variables.type == tokens.NAME: variable_name = variables.value variable_by_name[variable_name] = parser.Variable(container = container, name = variable_name, parser = parser) else: assert False, "Unexpected variables in for statement:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) assert in_word.type == tokens.NAME and in_word.value == 'in' iterator = parser.parse_value(iterator, container = container) assert colon.type == tokens.COLON and colon.value == ':' self = cls(container = container, iterator = iterator, parser = parser, variable_by_name = variable_by_name) self.body = parser.parse_suite(body, container = container) return self class Function(AbstractWrapper): body = None body_parsed = False keyword_name = None # Name of "kwargs" in "**kwargs" name = None named_parameters = None # Dictionary of parameter name => default value positional_parameters = None # List of parameters names returns = None # List of Return wrappers present in function star_name = None # Name of "args" in "*args" variable_by_name = None def __init__(self, body = None, container = None, hint = None, name = None, keyword_name = None, named_parameters = None, node = None, parser = None, positional_parameters = None, returns = None, star_name = None, variable_by_name = None): super(Function, self).__init__(container = container, hint = hint, node = node, parser = parser) if body is None: body = [] else: assert isinstance(body, list) if body: self.body_parsed = True self.body = body if keyword_name is not None: assert isinstance(keyword_name, basestring) self.keyword_name = keyword_name assert isinstance(name, basestring) self.name = name if named_parameters is None: named_parameters = collections.OrderedDict() else: assert isinstance(named_parameters, collections.OrderedDict) self.named_parameters = named_parameters if positional_parameters is None: positional_parameters = [] else: assert isinstance(positional_parameters, list) self.positional_parameters = positional_parameters if returns is None: returns = [] else: assert isinstance(returns, list) self.returns = returns if star_name is not None: assert isinstance(star_name, basestring) self.star_name = star_name if variable_by_name is None: variable_by_name = collections.OrderedDict() else: assert isinstance(variable_by_name, collections.OrderedDict) self.variable_by_name = variable_by_name @property def containing_function(self): return self @classmethod def get_function_class(cls, parser = None): return parser.Function def get_variable(self, name, default = UnboundLocalError, parser = None): variable = self.variable_by_name.get(name, None) if variable is None: container = self.container if container is not None: return container.get_variable(name, default = default, parser = parser) if default is UnboundLocalError: raise KeyError("Undefined value for {}".format(name)) variable = default return variable @classmethod def parse(cls, node, container = None, parser = None): try: children = node.children assert len(children) == 5 assert children[0].type == tokens.NAME and children[0].value == 'def' assert children[1].type == tokens.NAME # Function name name = children[1].value self = cls(container = container, name = name, node = node, parser = parser) self.parse_parameters() # Don't parse body now. Wait for first call (to know the values of the parameters). # self.parse_body() return self except: if node is not None: print "An exception occurred in node:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) raise def parse_body(self): parser = self.parser children = self.node.children assert len(children) == 5 self.body_parsed = True body = parser.parse_suite(children[4], container = self) self.body[:] = body def parse_call(self, call): if not self.body_parsed: parser = self.parser positional_arguments = call.positional_arguments if call.star_argument is not None: positional_arguments = positional_arguments + list(call.star_argument.value.value) for argument_name, argument_value in itertools.izip(self.positional_parameters, positional_arguments): self.variable_by_name[argument_name].value = argument_value if self.star_name is not None: self.variable_by_name[self.star_name].value = parser.Tuple( container = self, parser = parser, value = tuple(positional_arguments[len(self.positional_parameters):]), ) named_arguments = call.named_arguments if call.keyword_argument is not None: named_arguments = named_arguments.copy() named_arguments.update(call.keyword_argument.value.value) keyword_argument = {} for argument_name, argument_value in named_arguments.iteritems(): if argument_name in self.named_parameters: self.variable_by_name[argument_name].value = argument_value else: keyword_argument[argument_name] = argument_value if self.keyword_name is not None: self.variable_by_name[self.keyword_name].value = parser.Dictionary( container = self, parser = parser, value = keyword_argument, ) self.parse_body() def parse_parameters(self): parser = self.parser children = self.node.children assert len(children) == 5 parameters = children[2] assert parameters.type == symbols.parameters parameters_children = parameters.children assert 2 <= len(parameters_children) <= 3, \ "Unexpected length {} of children in parameters statement:\n{}\n\n{}".format(len(parameters_children), repr(parameters), unicode(parameters).encode('utf-8')) assert parameters_children[0].type == tokens.LPAR and parameters_children[0].value == '(' if len(parameters_children) == 3: if parameters_children[1].type == tokens.NAME: # Single positional parameter typedargslist = None typedargslist_children = [parameters_children[1]] else: typedargslist = parameters_children[1] assert typedargslist.type == symbols.typedargslist typedargslist_children = typedargslist.children typedargslist_child_index = 0 while typedargslist_child_index < len(typedargslist_children): typedargslist_child = typedargslist_children[typedargslist_child_index] if typedargslist_child.type == tokens.DOUBLESTAR: typedargslist_child_index += 1 typedargslist_child = typedargslist_children[typedargslist_child_index] assert typedargslist_child.type == tokens.NAME, "Unexpected typedargslist child:\n{}\n\n{}".format( repr(typedargslist_child), unicode(typedargslist_child).encode('utf-8')) self.keyword_name = typedargslist_child.value self.variable_by_name[self.keyword_name] = parser.Variable(container = self, name = self.keyword_name, parser = parser) typedargslist_child_index += 1 if typedargslist_child_index >= len(typedargslist_children): break typedargslist_child = typedargslist_children[typedargslist_child_index] assert typedargslist_child.type == tokens.COMMA typedargslist_child_index += 1 elif typedargslist_child.type == tokens.STAR: typedargslist_child_index += 1 typedargslist_child = typedargslist_children[typedargslist_child_index] assert typedargslist_child.type == tokens.NAME, "Unexpected typedargslist child:\n{}\n\n{}".format( repr(typedargslist_child), unicode(typedargslist_child).encode('utf-8')) self.star_name = typedargslist_child.value self.variable_by_name[self.star_name] = parser.Variable(container = self, name = self.star_name, parser = parser) typedargslist_child_index += 1 if typedargslist_child_index >= len(typedargslist_children): break typedargslist_child = typedargslist_children[typedargslist_child_index] assert typedargslist_child.type == tokens.COMMA typedargslist_child_index += 1 else: assert typedargslist_child.type == tokens.NAME, "Unexpected typedargslist child:\n{}\n\n{}".format( repr(typedargslist_child), unicode(typedargslist_child).encode('utf-8')) parameter_name = typedargslist_child.value typedargslist_child_index += 1 if typedargslist_child_index >= len(typedargslist_children): # Last positional parameter self.positional_parameters.append(parameter_name) self.variable_by_name[parameter_name] = parser.Variable(container = self, name = parameter_name, parser = parser) break typedargslist_child = typedargslist_children[typedargslist_child_index] if typedargslist_child.type == tokens.COMMA: # Positional parameter self.positional_parameters.append(parameter_name) self.variable_by_name[parameter_name] = parser.Variable(container = self, name = parameter_name, parser = parser) typedargslist_child_index += 1 elif typedargslist_child.type == tokens.EQUAL: # Named parameter typedargslist_child_index += 1 typedargslist_child = typedargslist_children[typedargslist_child_index] self.named_parameters[parameter_name] = parser.parse_value(typedargslist_child, container = self) self.variable_by_name[parameter_name] = parser.Variable(container = self, name = parameter_name, parser = parser) typedargslist_child_index += 1 if typedargslist_child_index >= len(typedargslist_children): break typedargslist_child = typedargslist_children[typedargslist_child_index] assert typedargslist_child.type == tokens.COMMA typedargslist_child_index += 1 assert parameters_children[-1].type == tokens.RPAR and parameters_children[-1].value == ')' assert children[3].type == tokens.COLON and children[3].value == ':' # class FunctionCall(AbstractWrapper): # definition = None # variable_by_name = None # def __init__(self, node, definition = None, parser = None): # super(FunctionCall, self).__init__(node, parser = parser) # assert isinstance(definition, Function) # self.definition = definition # self.variable_by_name = collections.OrderedDict() # def get_variable(self, name, default = UnboundLocalError, parser = None): # variable = self.variable_by_name.get(name, None) # if variable is None: # container = self.definition.container # if container is not None: # return container.get_variable(name, default = default, parser = parser) # if default is UnboundLocalError: # raise KeyError("Undefined value for {}".format(name)) # variable = default # return variable class FunctionFileInput(AbstractWrapper): @classmethod def get_function_class(cls, parser = None): return parser.Function @classmethod def parse(cls, function, parser = None): source_lines, line_number = inspect.getsourcelines(function) source = textwrap.dedent(''.join(source_lines)) # print source node = parser.driver.parse_string(source) assert node.type == symbols.file_input, "Unexpected file input type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) == 2 and children[0].type == symbols.funcdef and children[1].type == tokens.ENDMARKER, \ "Unexpected node children in:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) python_module = inspect.getmodule(function) if parser.country_package is not None: assert python_module.__file__.startswith(os.path.dirname(parser.country_package.__file__)), \ "Requested class is defined outside country_package:\n{}".format(source) module = parser.python_module_by_name.get(python_module.__name__) if module is None: parser.python_module_by_name[python_module.__name__] = module = parser.Module(node, python = python_module, parser = parser) self = cls(parser = parser) function_class = self.get_function_class(parser = parser) try: return function_class.parse(children[0], container = module, parser = parser) except: if node is not None: print "An exception occurred in node:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) raise class Holder(AbstractWrapper): column = None def __init__(self, column = None, container = None, hint = None, node = None, parser = None): super(Holder, self).__init__(container = container, hint = hint, node = node, parser = parser) assert column is not None self.column = column class If(AbstractWrapper): items = None # List of (test, body) couples def __init__(self, container = None, hint = None, node = None, items = None, parser = None): super(If, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(items, list) self.items = items @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.if_stmt, "Unexpected if statement type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) >= 4, "Unexpected length {} of children in if statement:\n{}\n\n{}".format( len(children), repr(node), unicode(node).encode('utf-8')) items = [] child_index = 0 while child_index < len(children): reserved_word = children[child_index] assert reserved_word.type == tokens.NAME and reserved_word.value in ('if', 'elif', 'else'), \ "Unexpected reserved word {}:\n{}\n\n{}".format(reserved_word.value, repr(node), unicode(node).encode('utf-8')) child_index += 1 if reserved_word.value == 'else': test = None else: test = parser.parse_value(children[child_index], container = container) child_index += 1 colon = children[child_index] assert colon.type == tokens.COLON and colon.value == ':', "Unexpected colon {}:\n{}\n\n{}".format( colon.value, repr(node), unicode(node).encode('utf-8')) child_index += 1 body = parser.parse_suite(children[child_index], container = container) child_index += 1 items.append((test, body)) return cls(container = container, items = items, node = node, parser = parser) class Instant(AbstractWrapper): pass class Key(AbstractWrapper): subject = None value = None # Value of the key def __init__(self, container = None, hint = None, node = None, parser = None, subject = None, value = None): super(Key, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(subject, AbstractWrapper) self.subject = subject assert isinstance(value, AbstractWrapper) self.value = value def guess(self, expected): guessed = super(Key, self).guess(expected) if guessed is not None: return guessed parser = self.parser uniform_dictionary = self.subject.guess(parser.UniformDictionary) if uniform_dictionary is not None: return uniform_dictionary.value.guess(expected) return None @classmethod def parse(cls, subject, node, container = None, parser = None): assert node.type == symbols.trailer, "Unexpected key type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) == 3, "Unexpected length {} of children in power key:\n{}\n\n{}".format( len(children), repr(node), unicode(node).encode('utf-8')) left_bracket, key, right_bracket = children assert left_bracket.type == tokens.LSQB, "Unexpected left bracket type:\n{}\n\n{}".format(repr(left_bracket), unicode(left_bracket).encode('utf-8')) value = parser.parse_value(key, container = container) assert right_bracket.type == tokens.RSQB, "Unexpected right bracket type:\n{}\n\n{}".format(repr(right_bracket), unicode(right_bracket).encode('utf-8')) return cls(container = container, node = node, parser = parser, subject = subject, value = value) class Lambda(AbstractWrapper): expression = None positional_parameters = None # List of parameters names variable_by_name = None def __init__(self, container = None, expression = None, hint = None, node = None, parser = None, positional_parameters = None, variable_by_name = None): super(Lambda, self).__init__(container = container, hint = hint, node = node, parser = parser) if expression is not None: assert isinstance(expression, AbstractWrapper) self.expression = expression if positional_parameters is None: positional_parameters = [] else: assert isinstance(positional_parameters, list) self.positional_parameters = positional_parameters if variable_by_name is None: variable_by_name = collections.OrderedDict() else: assert isinstance(variable_by_name, collections.OrderedDict) self.variable_by_name = variable_by_name @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.lambdef, "Unexpected lambda definition type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) >= 4, "Unexpected length {} of children in lambda definition:\n{}\n\n{}".format( len(children), repr(node), unicode(node).encode('utf-8')) lambda_word, parameters, colon, expression = children self = cls(container = container, node = node, parser = parser) assert lambda_word.type == tokens.NAME and lambda_word.value == 'lambda' if parameters.type == tokens.NAME: parameter_name = parameters.value self.positional_parameters.append(parameter_name) self.variable_by_name[parameter_name] = parser.Variable(container = self, name = parameter_name, parser = parser) else: assert False, "Unexpected parameters in lambda definition:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) assert colon.type == tokens.COLON and colon.value == ':' self.expression = parser.parse_value(expression, container = self) return self @property def containing_function(self): return self def get_variable(self, name, default = UnboundLocalError, parser = None): variable = self.variable_by_name.get(name, None) if variable is None: container = self.container if container is not None: return container.get_variable(name, default = default, parser = parser) if default is UnboundLocalError: raise KeyError("Undefined value for {}".format(name)) variable = default return variable class List(AbstractWrapper): value = None # list value, as a list def __init__(self, container = None, hint = None, node = None, parser = None, value = None): super(List, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(value, list), "Unexpected value for list: {} of type {}".format(value, type(value)) self.value = value @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.listmaker, "Unexpected list type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children child_index = 0 items = [] while child_index < len(children): child = children[child_index] items.append(parser.parse_value(child, container = container)) child_index += 1 if child_index >= len(children): break comma = children[child_index] assert comma.type == tokens.COMMA, "Unexpected comma type:\n{}\n\n{}".format(repr(comma), unicode(comma).encode('utf-8')) child_index += 1 return cls(container = container, node = node, parser = parser, value = items) class ListGenerator(AbstractWrapper): iterators = None value = None variable_by_name = None def __init__(self, container = None, hint = None, iterators = None, node = None, parser = None, value = None, variable_by_name = None): super(ListGenerator, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(iterators, list) self.iterators = iterators variables_value = [] for iterator in iterators: guessed_iterator = iterator.guess(parser.UniformIterator) if guessed_iterator is None: if len(variable_by_name) == 1 and 'bar' in variable_by_name: guessed_iterator = parser.UniformIterator( container = container, items = [ parser.TaxScale( container = container, parser = parser, ), ], parser = parser, ) else: assert False, "{} has no iterator".format(iterator) variables_value.extend(guessed_iterator.items) if value is not None: assert isinstance(value, AbstractWrapper) self.value = value assert isinstance(variable_by_name, collections.OrderedDict) for variable, value in itertools.izip(variable_by_name.itervalues(), variables_value): variable.value = value self.variable_by_name = variable_by_name container.variable_by_name.update(variable_by_name) @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.listmaker, "Unexpected list type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) >= 2, "Unexpected length {} of children in for statement:\n{}\n\n{}".format( len(children), repr(node), unicode(node).encode('utf-8')) iterators = [] variable_by_name = collections.OrderedDict() for child_index, child in enumerate(children[1:], 1): if child.type == symbols.comp_for: for_node = child assert len(for_node.children) == 4, \ "Unexpected length {} of for children in list generator statement:\n{}\n\n{}".format( len(for_node.children), repr(for_node), unicode(for_node).encode('utf-8')) for_word, variables, in_word, iterator = for_node.children assert for_word.type == tokens.NAME and for_word.value == 'for' if variables.type == symbols.exprlist: variables = variables.children variable_index = 0 while variable_index < len(variables): variable = variables[variable_index] assert variable.type == tokens.NAME variable_name = variable.value variable_by_name[variable_name] = parser.Variable(container = container, name = variable_name, parser = parser) variable_index += 1 if variable_index >= len(variables): break comma = variables[variable_index] assert comma.type == tokens.COMMA, "Unexpected comma type:\n{}\n\n{}".format(repr(comma), unicode(comma).encode('utf-8')) variable_index += 1 elif variables.type == tokens.NAME: variable_name = variables.value variable_by_name[variable_name] = parser.Variable(container = container, name = variable_name, parser = parser) else: assert False, "Unexpected variables in for statement:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) assert in_word.type == tokens.NAME and in_word.value == 'in' iterator = parser.parse_value(iterator, container = container) iterators.append(iterator) else: assert False, "Unexpected item in list generator type:\n{}\n\n{}".format(repr(child), unicode(child).encode('utf-8')) self = cls(container = container, iterators = iterators, parser = parser, variable_by_name = variable_by_name) self.value = parser.parse_value(children[0], container = container) return self class Logger(AbstractWrapper): pass # class Math(AbstractWrapper): # pass class Module(AbstractWrapper): python = None variable_by_name = None def __init__(self, node, python = None, parser = None): super(Module, self).__init__(node = node, parser = parser) if python is not None: # Python module self.python = python self.variable_by_name = collections.OrderedDict(sorted(dict( and_ = parser.Variable(container = self, name = u'and_', parser = parser), around = parser.Variable(container = self, name = u'around', parser = parser), apply_along_axis = parser.Variable(container = self, name = u'apply_along_axis', parser = parser), array = parser.Variable(container = self, name = u'array', parser = parser), CAT = parser.Variable(container = self, name = u'CAT', parser = parser, value = parser.Enum(parser = parser)), ceil = parser.Variable(container = self, name = u'ceil', parser = parser), CHEF = parser.Variable(container = self, name = u'CHEF', parser = parser, value = parser.Number(parser = parser, value = 0)), # combine_tax_scales = parser.Variable(container = self, name = u'combine_tax_scales', parser = parser), CONJ = parser.Variable(container = self, name = u'CONJ', parser = parser, value = parser.Number(parser = parser, value = 1)), CREF = parser.Variable(container = self, name = u'CREF', parser = parser, value = parser.Number(parser = parser, value = 1)), date = parser.Variable(container = self, name = u'date', parser = parser), datetime64 = parser.Variable(container = self, name = u'datetime64', parser = parser), dict = parser.Variable(container = self, name = u'dict', parser = parser), # ENFS = parser.Variable(container = self, name = u'ENFS', parser = parser, # value = parser.UniformList(parser = parser, value = parser.Number(parser = parser, value = x))), ENFS = parser.Variable(container = self, name = u'ENFS', parser = parser), floor = parser.Variable(container = self, name = u'floor', parser = parser), fromiter = parser.Variable(container = self, name = u'fromiter', parser = parser), fsolve = parser.Variable(container = self, name = u'fsolve', parser = parser), hasattr = parser.Variable(container = self, name = u'hasattr', parser = parser), holidays = parser.Variable(container = self, name = u'holidays', parser = parser), int16 = parser.Variable(container = self, name = u'int16', parser = parser, value = parser.Type(parser = parser, value = np.int16)), int32 = parser.Variable(container = self, name = u'int32', parser = parser, value = parser.Type(parser = parser, value = np.int32)), izip = parser.Variable(container = self, name = u'izip', parser = parser), law = parser.Variable(container = self, name = u'law', parser = parser, value = parser.CompactNode(parser = parser, value = parser.tax_benefit_system.legislation_json)), len = parser.Variable(container = self, name = u'len', parser = parser), log = parser.Variable(container = self, name = u'log', parser = parser, value = parser.Logger(parser = parser)), MarginalRateTaxScale = parser.Variable(container = self, name = u'MarginalRateTaxScale', parser = parser), max = parser.Variable(container = self, name = u'max', parser = parser), max_ = parser.Variable(container = self, name = u'max_', parser = parser), math = parser.Variable(container = self, name = u'math', parser = parser), min_ = parser.Variable(container = self, name = u'min_', parser = parser), not_ = parser.Variable(container = self, name = u'not_', parser = parser), ones = parser.Variable(container = self, name = u'ones', parser = parser), or_ = parser.Variable(container = self, name = u'or_', parser = parser), original_busday_count = parser.Variable(container = self, name = u'original_busday_count', parser = parser), PAC1 = parser.Variable(container = self, name = u'PAC1', parser = parser, value = parser.Number(parser = parser, value = 2)), PAC2 = parser.Variable(container = self, name = u'PAC2', parser = parser, value = parser.Number(parser = parser, value = 3)), PAC3 = parser.Variable(container = self, name = u'PAC3', parser = parser, value = parser.Number(parser = parser, value = 4)), PART = parser.Variable(container = self, name = u'PART', parser = parser, value = parser.Number(parser = parser, value = 1)), partial = parser.Variable(container = self, name = u'partial', parser = parser), PREF = parser.Variable(container = self, name = u'PREF', parser = parser, value = parser.Number(parser = parser, value = 0)), round = parser.Variable(container = self, name = u'round', parser = parser), round_ = parser.Variable(container = self, name = u'round_', parser = parser), # scale_tax_scales = parser.Variable(container = self, name = u'scale_tax_scales', parser = parser), SCOLARITE_COLLEGE = parser.Variable(container = self, name = u'SCOLARITE_COLLEGE', parser = parser, value = parser.Number(parser = parser, value = 1)), sorted = parser.Variable(container = self, name = u'sorted', parser = parser), startswith = parser.Variable(container = self, name = u'startswith', parser = parser), TAUX_DE_PRIME = parser.Variable(container = self, name = u'TAUX_DE_PRIME', parser = parser, value = parser.Number(parser = parser, value = 1 / 4)), # TaxScalesTree = parser.Variable(container = self, name = u'TaxScalesTree', parser = parser), timedelta64 = parser.Variable(container = self, name = u'timedelta64', parser = parser), ValueError = parser.Variable(container = self, name = u'ValueError', parser = parser), VOUS = parser.Variable(container = self, name = u'VOUS', parser = parser, value = parser.Number(parser = parser, value = 0)), where = parser.Variable(container = self, name = u'where', parser = parser), xor_ = parser.Variable(container = self, name = u'xor_', parser = parser), zeros = parser.Variable(container = self, name = u'zeros', parser = parser), zone_apl_by_depcom = parser.Variable(container = self, name = u'zone_apl_by_depcom', parser = parser), ).iteritems())) @property def containing_module(self): return self def get_variable(self, name, default = UnboundLocalError, parser = None): variable = self.variable_by_name.get(name, None) if variable is None: value = getattr(self.python, name, UnboundLocalError) if value is UnboundLocalError: if default is UnboundLocalError: raise KeyError("Undefined value for {}".format(name)) return default if not inspect.isfunction(value): # TODO? if default is UnboundLocalError: raise KeyError("Undefined value for {}".format(name)) return default # Declare function before parsing if to avoid infinite parsing when it is recursive. self.variable_by_name[name] = variable = parser.Variable(container = self, name = name, parser = parser) function = parser.FunctionFileInput.parse(value, parser = parser) assert isinstance(function, parser.Function), function variable.value = function return variable class NoneWrapper(AbstractWrapper): pass class NotTest(AbstractWrapper): value = None def __init__(self, container = None, hint = None, node = None, parser = None, value = None): super(NotTest, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(value, AbstractWrapper) self.value = value def guess(self, expected): guessed = super(NotTest, self).guess(expected) if guessed is not None: return guessed parser = self.parser if issubclass(parser.Array, expected): array = self.value.guess(parser.Array) if array is not None: return parser.Array( cell = parser.Boolean( parser = parser, ), entity_class = array.entity_class, parser = parser, ) elif issubclass(parser.Boolean, expected): return parser.Boolean( parser = parser, ) return None @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.not_test, "Unexpected not test type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) == 2, len(children) assert children[0].type == tokens.NAME and children[0].value == 'not' value = parser.parse_value(children[1], container = container) return cls(container = container, node = node, parser = parser, value = value) class Number(AbstractWrapper): type = None value = None def __init__(self, container = None, node = None, parser = None, type = None, value = None): super(Number, self).__init__(container = container, node = node, parser = parser) if type is not None: self.type = type if value is not None: assert isinstance(value, (bool, int, float)), "Unexpected value for number: {} of type {}".format(value, type(value)) self.value = value @classmethod def parse(cls, node, container = None, parser = None): assert node.type == tokens.NUMBER, "Unexpected number type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) try: value = int(node.value) except ValueError: value = float(node.value) return cls(container = container, node = node, parser = parser, value = value) class ParentheticalExpression(AbstractWrapper): value = None def __init__(self, container = None, hint = None, node = None, parser = None, value = None): super(ParentheticalExpression, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(value, AbstractWrapper) self.value = value def guess(self, expected): guessed = super(ParentheticalExpression, self).guess(expected) if guessed is not None: return guessed if self.value is not None: guessed = self.value.guess(expected) if guessed is not None: return guessed return None class Period(AbstractWrapper): unit = None def __init__(self, container = None, hint = None, node = None, parser = None, unit = None): super(Period, self).__init__(container = container, hint = hint, node = node, parser = parser) if unit is not None: assert unit in (u'day', u'month', u'year'), unit self.unit = unit class Raise(AbstractWrapper): exception = None def __init__(self, container = None, exception = None, hint = None, node = None, parser = None): super(Raise, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(exception, AbstractWrapper) self.exception = exception @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.raise_stmt, "Unexpected raise type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) == 2, "Unexpected length {} of children in raise:\n{}\n\n{}".format( len(children), repr(node), unicode(node).encode('utf-8')) raise_word = children[0] assert raise_word.type == tokens.NAME and raise_word.value == 'raise' exception = parser.parse_value(children[1], container = container) return cls(container = container, exception = exception, node = node, parser = parser) class Return(AbstractWrapper): value = None def __init__(self, container = None, hint = None, node = None, parser = None, value = None): super(Return, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(value, AbstractWrapper) self.value = value def guess(self, expected): guessed = super(Return, self).guess(expected) if guessed is not None: return guessed if self.value is not None: guessed = self.value.guess(expected) if guessed is not None: return guessed return None @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.return_stmt, "Unexpected return type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) == 2, len(children) assert children[0].type == tokens.NAME and children[0].value == 'return' value = parser.parse_value(children[1], container = container) self = cls(container = container, node = node, parser = parser, value = value) containing_function = self.containing_function containing_function.returns.append(self) return self class Role(Number): pass class Simulation(AbstractWrapper): pass class StemNode(AbstractWrapper): """An indifferentiated CompactNode or Parameter or TaxScale""" is_reference = True parent = None # Parent CompactNode wrapper def __init__(self, is_reference = False, parent = None, parser = None): super(StemNode, self).__init__(parser = parser) if not is_reference: self.is_reference = False if parent is not None: assert isinstance(parent, (CompactNode, StemNode)) self.parent = parent # def iter_names(self): # parent = self.parent # if parent is not None: # for ancestor_name in parent.iter_names(): # yield ancestor_name # name = self.name # if name is not None: # yield name # @property # def path(self): # return '.'.join(self.iter_names()) class String(AbstractWrapper): value = None # String value, as a string def __init__(self, container = None, node = None, parser = None, value = None): super(String, self).__init__(container = container, node = node, parser = parser) if value is not None: assert isinstance(value, unicode), "Unexpected value for string: {} of type {}".format(value, type(value)) self.value = value @classmethod def parse(cls, node, container = None, parser = None): assert node.type == tokens.STRING, "Unexpected string type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) value = node.value if isinstance(value, str): value = value.decode('utf-8') if value.startswith(u'u'): value = value[1:] for delimiter in (u'"""', u"'''", u'"', u"'"): if value.startswith(delimiter) and value.endswith(delimiter): value = value[len(delimiter):-len(delimiter)] break else: assert False, "Unknow delimiters for: {}".format(value) return cls(container = container, node = node, parser = parser, value = value) # class Structure(AbstractWrapper): # value = None # A list or a dictionary # # def __init__(self, container = None, hint = None, node = None, parser = None, value = None): # super(Structure, self).__init__(container = container, hint = hint, node = node, parser = parser) # assert isinstance(value, (dict, list, tuple)), "Unexpected value for structure: {} of type {}".format(value, # type(value)) # self.value = value class TaxScale(AbstractWrapper): pass # class TaxScalesTree(AbstractWrapper): # pass class Term(AbstractWrapper): items = None def __init__(self, container = None, hint = None, items = None, node = None, parser = None): super(Term, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(items, list) assert len(items) >= 3 and (len(items) & 1) self.items = items def guess(self, expected): guessed = super(Term, self).guess(expected) if guessed is not None: return guessed parser = self.parser if issubclass(parser.Array, expected): for index, item in enumerate(self.items): if index & 1: continue array = item.guess(parser.Array) if array is not None: return parser.Array( cell = parser.Number( parser = parser, ), entity_class = array.entity_class, parser = parser, ) return None @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.term, "Unexpected term type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) >= 3 and (len(children) & 1), \ "Unexpected length {} of children in term:\n{}\n\n{}".format(len(children), repr(node), unicode(node).encode('utf-8')) child_index = 0 items = [] while child_index < len(children): child = children[child_index] items.append(parser.parse_value(child, container = container)) child_index += 1 if child_index >= len(children): break operator = children[child_index] assert operator.type in ( tokens.DOUBLESLASH, tokens.PERCENT, tokens.SLASH, tokens.STAR, ), "Unexpected operator type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) items.append(operator.value) child_index += 1 return cls(container = container, items = items, node = node, parser = parser) class Test(AbstractWrapper): false_value = None test = None true_value = None def __init__(self, container = None, false_value = None, hint = None, node = None, parser = None, test = None, true_value = None): super(Test, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(false_value, AbstractWrapper) self.false_value = false_value assert isinstance(test, AbstractWrapper) self.test = test assert isinstance(true_value, AbstractWrapper) self.true_value = true_value @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.test, "Unexpected test statement type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) == 5, "Unexpected length {} of children in test statement:\n{}\n\n{}".format( len(children), repr(node), unicode(node).encode('utf-8')) true_value, if_word, test, else_word, false_value = children true_value = parser.parse_value(true_value, container = container) assert if_word.type == tokens.NAME and if_word.value == 'if' test = parser.parse_value(test, container = container) assert else_word.type == tokens.NAME and else_word.value == 'else' false_value = parser.parse_value(false_value, container = container) return cls(container = container, false_value = false_value, node = node, parser = parser, test= test, true_value = true_value) class Tuple(AbstractWrapper): value = None # Tuple value, as a tuple def __init__(self, container = None, hint = None, node = None, parser = None, value = None): super(Tuple, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(value, tuple), "Unexpected value for tuple: {} of type {}".format(value, type(value)) self.value = value @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.testlist, "Unexpected tuple type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children child_index = 0 items = [] while child_index < len(children): child = children[child_index] items.append(parser.parse_value(child, container = container)) child_index += 1 if child_index >= len(children): break comma = children[child_index] assert comma.type == tokens.COMMA, "Unexpected comma type:\n{}\n\n{}".format(repr(comma), unicode(comma).encode('utf-8')) child_index += 1 return cls(container = container, node = node, parser = parser, value = tuple(items)) class TupleGenerator(AbstractWrapper): # TODO: Used only by zone_apl @classmethod def parse(cls, node, container = None, parser = None): assert node.type == symbols.testlist_gexp, "Unexpected tuple type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) # TODO: Used only by zone_apl return cls(container = container, node = node, parser = parser) class Type(AbstractWrapper): value = None def __init__(self, container = None, node = None, parser = None, value = None): super(Type, self).__init__(container = container, node = node, parser = parser) if value is not None: self.value = value class UniformDictionary(AbstractWrapper): key = None value = None def __init__(self, container = None, key = None, node = None, parser = None, value = None): super(UniformDictionary, self).__init__(container = container, node = node, parser = parser) assert isinstance(key, AbstractWrapper) self.key = key assert isinstance(value, AbstractWrapper) self.value = value def guess(self, expected): guessed = super(UniformDictionary, self).guess(expected) if guessed is not None: return guessed parser = self.parser if issubclass(parser.UniformIterator, expected): return parser.UniformIterator( items = [self.key], parser = parser, ) return None class UniformIterator(AbstractWrapper): items = None # A list of values (one for iterkeys & itervalues, 2 for iteritems) that iterate. def __init__(self, container = None, items = None, node = None, parser = None): super(UniformIterator, self).__init__(container = container, node = node, parser = parser) assert isinstance(items, list) self.items = items # class UniformList(AbstractWrapper): # item = None # def __init__(self, node, item = None, parser = None): # super(UniformList, self).__init__(node, parser = parser) # if item is not None: # self.item = item class Variable(AbstractWrapper): name = None value = None # A value wrapper def __init__(self, container = None, hint = None, name = None, node = None, parser = None, value = None): super(Variable, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(name, basestring) self.name = name if value is not None: assert isinstance(value, AbstractWrapper) self.value = value def __repr__(self): return u''.format(self.name) def guess(self, expected): guessed = super(Variable, self).guess(expected) if guessed is not None: return guessed if self.value is not None: guessed = self.value.guess(expected) if guessed is not None: return guessed return None @classmethod def parse(cls, node, container = None, parser = None, value = None): assert node.type == tokens.NAME, "Unexpected variable type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) return cls(container = container, name = node.value, node = node, parser = parser, value = value) class XorExpression(AbstractWrapper): operands = None operator = None def __init__(self, container = None, hint = None, node = None, operands = None, operator = None, parser = None): super(XorExpression, self).__init__(container = container, hint = hint, node = node, parser = parser) assert isinstance(operands, list) self.operands = operands assert isinstance(operator, basestring) self.operator = operator def guess(self, expected): guessed = super(XorExpression, self).guess(expected) if guessed is not None: return guessed parser = self.parser if issubclass(parser.Array, expected): for operand in self.operands: array = operand.guess(parser.Array) if array is not None: return parser.Array( cell = parser.Boolean( parser = parser, ), entity_class = array.entity_class, parser = parser, ) return None # Level-2 Wrappers class Boolean(Number): pass # Formula-specific classes class Formula(AbstractWrapper): column = None formula_class = None def __init__(self, container = None, formula_class = None, hint = None, node = None, parser = None): super(Formula, self).__init__(container = container, hint = hint, node = node, parser = parser) self.column = parser.column assert self.column is not None assert isinstance(formula_class, parser.FormulaClass) self.formula_class = formula_class class FormulaClass(Class): @classmethod def get_function_class(cls, parser = None): return parser.FormulaFunction class FormulaClassFileInput(ClassFileInput): # Caution: This is not the whole module, but only a dummy "module" containing only the formula. @classmethod def get_class_class(cls, parser = None): return parser.FormulaClass class FormulaFunction(Function): @classmethod def parse(cls, node, container = None, parser = None): try: children = node.children assert len(children) == 5 assert children[0].type == tokens.NAME and children[0].value == 'def' assert children[1].type == tokens.NAME # Function name name = children[1].value self = cls(container = container, name = name, node = node, parser = parser) self.parse_parameters() self.parse_body() return self except: if node is not None: print "An exception occurred in node:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) raise def parse_parameters(self): super(FormulaFunction, self).parse_parameters() parser = self.parser assert self.positional_parameters == ['self', 'simulation', 'period'], self.positional_arguments assert not self.named_parameters, self.named_arguments formula_variable = self.variable_by_name['self'] assert formula_variable.value is None, formula_variable.value formula_variable.value = parser.Formula(container = self.container.container, formula_class = self.container, parser = self.parser) simulation_variable = self.variable_by_name['simulation'] assert simulation_variable.value is None, simulation_variable.value simulation_variable.value = parser.Simulation(parser = self.parser) period_variable = self.variable_by_name['period'] assert period_variable.value is None, period_variable.value period_variable.value = parser.Period(parser = self.parser, unit = u'day') # class FormulaFunctionFileInput(FunctionFileInput): # # Caution: This is not the whole module, but only a dummy "module" containing only the formula. # @classmethod # def get_function_class(cls, parser = None): # return parser.FormulaFunction # Default Parser class Parser(conv.State): AndExpression = AndExpression AndTest = AndTest Array = Array # ArrayLength = ArrayLength ArithmeticExpression = ArithmeticExpression Assert = Assert Assignment = Assignment Attribute = Attribute Boolean = Boolean Call = Call Class = Class ClassFileInput = ClassFileInput column = None # Formula column CompactNode = CompactNode Comparison = Comparison Continue = Continue country_package = None Date = Date DateTime64 = DateTime64 DatedHolder = DatedHolder Decorator = Decorator Dictionary = Dictionary driver = None Entity = Entity # EntityToEntity = EntityToEntity Enum = Enum Expression = Expression Factor = Factor For = For Formula = Formula FormulaClass = FormulaClass FormulaClassFileInput = FormulaClassFileInput FormulaFunction = FormulaFunction # FormulaFunctionFileInput = FormulaFunctionFileInput Function = Function # FunctionCall = FunctionCall FunctionFileInput = FunctionFileInput Holder = Holder If = If Instant = Instant Key = Key Lambda = Lambda List = List ListGenerator = ListGenerator Logger = Logger # Math = Math Module = Module NoneWrapper = NoneWrapper NotTest = NotTest Number = Number ParentheticalExpression = ParentheticalExpression Period = Period python_module_by_name = None Raise = Raise Return = Return Role = Role Simulation = Simulation StemNode = StemNode String = String # Structure = Structure tax_benefit_system = None TaxScale = TaxScale # TaxScalesTree = TaxScalesTree Term = Term Test = Test Tuple = Tuple TupleGenerator = TupleGenerator Type = Type UniformDictionary = UniformDictionary UniformIterator = UniformIterator # UniformList = UniformList Variable = Variable XorExpression = XorExpression def __init__(self, country_package = None, driver = None, tax_benefit_system = None): if country_package is not None: self.country_package = country_package self.driver = driver self.python_module_by_name = {} self.tax_benefit_system = tax_benefit_system @property def entity_class(self): if self.column is None: return None return self.tax_benefit_system.entity_class_by_key_plural[self.column.entity_key_plural] def get_cell_wrapper(self, container = None, type = None): wrapper_class = { None: self.Number, np.bool: self.Boolean, np.float32: self.Number, np.int16: self.Number, np.int32: self.Number, 'datetime64[D]': self.DateTime64, }[type] if wrapper_class is self.Number: return wrapper_class(container = container, parser = self, type = type) return wrapper_class(container = container, parser = self) def parse_power(self, node, container = None): assert isinstance(node, lib2to3.pytree.Base), "Invalid node:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) assert isinstance(container, AbstractWrapper), "Invalid container {} for node:\n{}\n\n{}".format(container, repr(node), unicode(node).encode('utf-8')) assert node.type == symbols.power, "Unexpected power type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) >= 2, "Unexpected length {} of children in power:\n{}\n\n{}".format( len(children), repr(node), unicode(node).encode('utf-8')) subject = self.parse_value(children[0], container = container) for trailer in itertools.islice(children, 1, None): assert trailer.type == symbols.trailer, "Unexpected trailer type:\n{}\n\n{}".format(repr(trailer), unicode(trailer).encode('utf-8')) trailer_children = trailer.children trailer_first_child = trailer_children[0] if trailer_first_child.type == tokens.DOT: subject = self.Attribute.parse(subject, trailer, container = container, parser = self) elif trailer_first_child.type == tokens.LPAR: if len(trailer_children) == 2: left_parenthesis, right_parenthesis = trailer_children arguments = None else: assert len(trailer_children) == 3, \ "Unexpected length {} of children in power call:\n{}\n\n{}".format(len(trailer_children), repr(trailer), unicode(trailer).encode('utf-8')) left_parenthesis, arguments, right_parenthesis = trailer_children assert left_parenthesis.type == tokens.LPAR, "Unexpected left parenthesis type:\n{}\n\n{}".format( repr(left_parenthesis), unicode(left_parenthesis).encode('utf-8')) assert right_parenthesis.type == tokens.RPAR, "Unexpected right parenthesis type:\n{}\n\n{}".format( repr(right_parenthesis), unicode(right_parenthesis).encode('utf-8')) subject = self.Call.parse(subject, arguments, container = container, parser = self) else: subject = self.Key.parse(subject, trailer, container = container, parser = self) return subject def parse_suite(self, node, container = None): assert isinstance(node, lib2to3.pytree.Base), "Invalid node:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) assert isinstance(container, AbstractWrapper), "Invalid container {} for node:\n{}\n\n{}".format(container, repr(node), unicode(node).encode('utf-8')) if node.type == symbols.suite: children = node.children else: children = [node] # Suite is only a single statement. body = [] for child in children: if child.type == symbols.for_stmt: for_wrapper = self.For.parse(child, container = container, parser = self) body.append(for_wrapper) elif child.type == symbols.funcdef: function = container.get_function_class(parser = self).parse(child, container = container, parser = self) body.append(function) container.variable_by_name[function.name] = self.Variable(container = container, name = function.name, parser = self, value = function) elif child.type == symbols.if_stmt: if_wrapper = self.If.parse(child, container = container, parser = self) body.append(if_wrapper) elif child.type == symbols.simple_stmt: assert len(child.children) == 2, \ "Unexpected length {} for simple statement in function definition:\n{}\n\n{}".format( len(child.children), repr(child), unicode(child).encode('utf-8')) statement = child.children[0] if statement.type == symbols.assert_stmt: assert_statement = self.Assert.parse(statement, container = container, parser = self) body.append(assert_statement) elif statement.type == symbols.expr_stmt: assignment = self.Assignment.parse(statement, container = container, parser = self) body.append(assignment) elif statement.type == symbols.global_stmt: # TODO: Used only by zone_apl. pass elif statement.type == symbols.power: power = self.parse_power(statement, container = container) body.append(power) elif statement.type == symbols.raise_stmt: raise_statement = self.Raise.parse(statement, container = container, parser = self) body.append(raise_statement) elif statement.type == symbols.return_stmt: return_wrapper = self.Return.parse(statement, container = container, parser = self) body.append(return_wrapper) elif statement.type == tokens.NAME and statement.value == 'continue': continue_statement = self.Continue(container = container, node = statement, parser = self) body.append(continue_statement) elif statement.type == tokens.STRING: # Docstring string = self.String.parse(statement, container = container, parser = self) body.append(string) else: assert False, "Unexpected simple statement in suite:\n{}\n\n{}".format(repr(child), unicode(child).encode('utf-8')) assert child.children[1].type == tokens.NEWLINE and child.children[1].value == '\n' elif child.type == symbols.with_stmt: # TODO: Used only by zone_apl. pass elif child.type in (tokens.DEDENT, tokens.INDENT, tokens.NEWLINE): pass else: assert False, "Unexpected statement in suite:\n{}\n\n{}".format(repr(child), unicode(child).encode('utf-8')) return body def parse_value(self, node, container = None): assert isinstance(node, lib2to3.pytree.Base), "Invalid node:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) assert isinstance(container, AbstractWrapper), "Invalid container {} for node:\n{}\n\n{}".format(container, repr(node), unicode(node).encode('utf-8')) if node.type == symbols.and_expr: return self.AndExpression.parse(node, container = container, parser = self) if node.type == symbols.and_test: return self.AndTest.parse(node, container = container, parser = self) if node.type == symbols.arith_expr: return self.ArithmeticExpression.parse(node, container = container, parser = self) if node.type == symbols.atom: assert node.type == symbols.atom, "Unexpected atom type:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) children = node.children assert len(children) == 3, "Unexpected length {} of children in atom:\n{}\n\n{}".format(len(children), repr(node), unicode(node).encode('utf-8')) left_parenthesis, value, right_parenthesis = children assert left_parenthesis.type in (tokens.LBRACE, tokens.LPAR, tokens.LSQB), \ "Unexpected left parenthesis {} in atom:\n{}\n\n{}".format(left_parenthesis.value, repr(node), unicode(node).encode('utf-8')) assert right_parenthesis.type in (tokens.RBRACE, tokens.RPAR, tokens.RSQB), \ "Unexpected right parenthesis {} in atom:\n{}\n\n{}".format(right_parenthesis.value, repr(node), unicode(node).encode('utf-8')) if left_parenthesis.type == tokens.LPAR: value = self.parse_value(value, container = container) return self.ParentheticalExpression(container = container, node = node, parser = self, value = value) if value.type == symbols.dictsetmaker: dict_children = value.children child_index = 0 while child_index < len(dict_children): item_key = self.parse_value(dict_children[child_index], container = container) assert dict_children[child_index + 1].type == tokens.COLON, \ "Unexpected colon {} in atom:\n{}\n\n{}".format(dict_children[child_index + 1], repr(value), unicode(value).encode('utf-8')) item_value = self.parse_value(dict_children[child_index + 2], container = container) child_index += 3 if dict_children[child_index].type == tokens.COMMA: child_index += 1 else: assert child_index == len(dict_children), \ "Missing comma after dictionary item {} in atom:\n{}\n\n{}".format(child_index, repr(value), unicode(value).encode('utf-8')) # TODO: Currently it is assumed that dictionary is uniform. return self.UniformDictionary( container = container, key = item_key, parser = self, value = item_value, ) if value.type == symbols.listmaker: if any(child.type == symbols.comp_for for child in value.children): return self.ListGenerator.parse(value, container = container, parser = self) return self.List.parse(value, container = container, parser = self) singleton = self.parse_value(value, container = container) return self.List(container = container, node = value, parser = self, value = [singleton]) if node.type == symbols.comparison: return self.Comparison.parse(node, container = container, parser = self) if node.type == symbols.expr: return self.Expression.parse(node, container = container, parser = self) if node.type == symbols.factor: return self.Factor.parse(node, container = container, parser = self) if node.type == symbols.lambdef: return self.Lambda.parse(node, container = container, parser = self) if node.type == symbols.not_test: return self.NotTest.parse(node, container = container, parser = self) if node.type == symbols.power: return self.parse_power(node, container = container) if node.type == symbols.term: return self.Term.parse(node, container = container, parser = self) if node.type == symbols.test: return self.Test.parse(node, container = container, parser = self) if node.type == symbols.testlist: return self.Tuple.parse(node, container = container, parser = self) if node.type == symbols.testlist_gexp: return self.TupleGenerator.parse(node, container = container, parser = self) if node.type == tokens.NAME: name = node.value if name == u'False': return self.Boolean(container = container, parser = self, value = False) elif name == u'None': return self.NoneWrapper(container = container, parser = self) elif name == u'True': return self.Boolean(container = container, parser = self, value = True) variable = container.get_variable(name, default = None, parser = self) assert variable is not None, "Undefined variable: {}".format(name) return variable if node.type == tokens.NUMBER: return self.Number.parse(node, container = container, parser = self) if node.type == tokens.STRING: return self.String.parse(node, container = container, parser = self) assert False, "Unexpected value:\n{}\n\n{}".format(repr(node), unicode(node).encode('utf-8')) @property def person_class(self): for entity_class in self.tax_benefit_system.entity_class_by_key_plural.itervalues(): if entity_class.is_persons_entity: return entity_class return None PKdFd/openfisca_parsers/input_variables_extractors.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Extract input variables from Python formulas using lib2to3.""" import lib2to3.pgen2.driver import lib2to3.pygram import lib2to3.pytree import logging from openfisca_core import formulas from . import formulas_parsers_2to3 log = logging.getLogger(__name__) class Attribute(formulas_parsers_2to3.Attribute): def __init__(self, container = None, hint = None, name = None, node = None, parser = None, subject = None): super(Attribute, self).__init__(container = container, hint = hint, name = name, node = node, parser = parser, subject = subject) compact_node = self.subject.guess(parser.CompactNode) if compact_node is not None: parser.parameters.add(tuple(compact_node.iter_names()) + (self.name,)) class Call(formulas_parsers_2to3.Call): def __init__(self, container = None, hint = None, keyword_argument = None, named_arguments = None, node = None, parser = None, positional_arguments = None, star_argument = None, subject = None): super(Call, self).__init__(container = container, hint = hint, keyword_argument = keyword_argument, named_arguments = named_arguments, node = node, parser = parser, positional_arguments = positional_arguments, star_argument = star_argument, subject = subject) if self.subject.name in ('calculate', 'calculate_add', 'calculate_add_divide', 'calculate_divide', 'compute', 'compute_add', 'compute_add_divide', 'compute_divide', 'get_array'): # TODO: Guess input_variable instead of assuming that it is a string with a "value" attribute. input_variable = self.positional_arguments[0] while isinstance(input_variable, parser.Variable): input_variable = input_variable.value if input_variable is None: # print "Missing input variable name while calling {} in {}".format(self.subject.name, # parser.column.name) return elif isinstance(input_variable, parser.Attribute) and input_variable.name == '__name__': # Assume this is "self.__class__.__name__". input_variable_name = parser.column.name assert input_variable_name is not None parser.input_variables.add(input_variable_name) return elif isinstance(input_variable, parser.String): input_variable_name = input_variable.value # Note: input_variable_name may be None when parsing salbrut, chobrut & rstbrut. if input_variable_name is not None: parser.input_variables.add(input_variable_name) return assert False, "Unexpected class for input variable: {}".format(input_variable) class Parser(formulas_parsers_2to3.Parser): Attribute = Attribute Call = Call def get_input_variables_and_parameters(self, column): formula_class = column.formula_class assert formula_class is not None, "Column {} has no formula".format(column.name) if issubclass(formula_class, formulas.AbstractEntityToEntity): return set([formula_class.variable_name]), set() if issubclass(formula_class, formulas.SimpleFormula) and formula_class.function is None: # Input variable return None, None self.column = column self.input_variables = input_variables = set() self.parameters = parameters = set() try: self.FormulaClassFileInput.parse(formula_class, parser = self) except AssertionError: # When parsing fails, assume that all input variables have already been parsed. pass for names_tuple in parameters.copy(): for i in range(len(names_tuple)): parameters.discard(names_tuple[:i]) parameters = set( u'.'.join(names_tuple) for names_tuple in parameters ) del self.column del self.input_variables del self.parameters self.python_module_by_name.clear() return input_variables, parameters def setup(tax_benefit_system): return Parser( driver = lib2to3.pgen2.driver.Driver(lib2to3.pygram.python_grammar, convert = lib2to3.pytree.convert, logger = log), tax_benefit_system = tax_benefit_system, ) PK1ZF%openfisca_parsers/scripts/__init__.pyPK(F-U.openfisca_parsers/scripts/formulas_to_julia.py#! /usr/bin/env python # -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Convert Python formulas to Julia using lib2to3.""" import argparse import codecs import collections import datetime import importlib import inspect import itertools import lib2to3.pgen2.driver # , tokenize, token import lib2to3.pygram import lib2to3.pytree import logging import os import sys import textwrap import traceback import numpy as np from openfisca_core import formulas from openfisca_parsers import formulas_parsers_2to3 app_name = os.path.splitext(os.path.basename(__file__))[0] julia_file_header = textwrap.dedent(u"""\ # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """) log = logging.getLogger(app_name) name_by_role_by_entity_key_singular = dict( famille = { 0: u'CHEF', }, foyer_fiscal = { 0: u'VOUS', }, ) # Abstract Wrappers class JuliaCompilerMixin(object): def testize(self, allow_array = False): container = self.container parser = self.parser if self.guess(parser.Dictionary) is not None or self.guess(parser.List) is not None \ or self.guess(parser.Tuple) is not None or self.guess(parser.UniformDictionary) is not None: return parser.NotTest( container = container, parser = parser, value = parser.Call( container = container, parser = parser, positional_arguments = [self], subject = parser.Variable( container = container, name = u'isempty', parser = parser, ), ), ) if self.guess(parser.Boolean) is not None: return self if self.guess(parser.Number) is not None: return parser.Comparison( container = container, left = parser.ParentheticalExpression( container = container, parser = parser, value = self, ), operator = u'!=', parser = parser, right = parser.Number( container = container, parser = parser, value = 0, ), ) if allow_array: array = self.guess(parser.Array) if array is not None: cell = array.cell if cell.guess(parser.Boolean) is not None: return self if cell.guess(parser.Number) is not None: return parser.Comparison( container = container, left = parser.ParentheticalExpression( container = container, parser = parser, value = self, ), operator = u'.!=', parser = parser, right = parser.Number( container = container, parser = parser, value = 0, ), ) assert False, "{} has a non-boolean value: {}\n{}".format(self.__class__.__name__, unicode(self.node).encode('utf-8'), self.__dict__) # Concrete Wrappers class AndTest(JuliaCompilerMixin, formulas_parsers_2to3.AndTest): def juliaize(self): return self.__class__( container = self.container, hint = self.hint, operands = [ operand.juliaize() for operand in self.operands ], operator = self.operator, parser = self.parser, ) def source_julia(self, depth = 0): return u' && '.join( operand.source_julia(depth = depth) for operand in self.operands ) class AndExpression(JuliaCompilerMixin, formulas_parsers_2to3.AndExpression): def juliaize(self): return self.__class__( container = self.container, hint = self.hint, operands = [ operand.juliaize().testize(allow_array = True) for operand in self.operands ], operator = self.operator, parser = self.parser, ) def source_julia(self, depth = 0): return u' {} '.format(self.operator).join( operand.source_julia(depth = depth) for operand in self.operands ) class ArithmeticExpression(JuliaCompilerMixin, formulas_parsers_2to3.ArithmeticExpression): def juliaize(self): return self.__class__( container = self.container, hint = self.hint, items = [ item if item_index & 1 else item.juliaize() for item_index, item in enumerate(self.items) ], parser = self.parser, ) def source_julia(self, depth = 0): array_expression = self.guess(self.parser.Array) is not None return u' '.join( (u'.{}'.format(item) if array_expression else item) if item_index & 1 else item.source_julia(depth = depth) for item_index, item in enumerate(self.items) ) class Array(JuliaCompilerMixin, formulas_parsers_2to3.Array): def juliaize(self): return self class Assert(JuliaCompilerMixin, formulas_parsers_2to3.Assert): def juliaize(self): return self.__class__( container = self.container, error = self.error.juliaize() if self.error is not None else None, hint = self.hint, parser = self.parser, test = self.test.juliaize(), ) def source_julia(self, depth = 0): return u'@assert {test}{error}'.format( error = u', {}'.format(self.error.source_julia(depth = depth + 1)) if self.error is not None else u'', test = self.test.source_julia(depth = depth + 1), ) class Assignment(JuliaCompilerMixin, formulas_parsers_2to3.Assignment): def juliaize(self): container = self.container parser = self.parser # New variables are not created, but the value of the existing ones are replaced (using right value), otherwise # all references to this variable (in other expressions) will continue using the non julialized version. left = self.left operator = self.operator right = [ right_item.juliaize() for right_item in self.right ] if len(left) == len(right) and operator == '=': for left_item, right_item in itertools.izip(left, right): if isinstance(left_item, parser.Variable): assert left_item is not right_item left_item.value = right_item if len(left) == 1: variable = left[0] if isinstance(variable, parser.Variable) and isinstance(variable.value, parser.Call): call = variable.value call_subject = call.subject if isinstance(call_subject, parser.Attribute): method_name = call_subject.name if method_name in ('calculate', 'calculate_add', 'calculate_add_divide', 'calculate_divide', 'compute', 'compute_add', 'compute_add_divide', 'compute_divide'): method_julia_name = dict( compute = u'calculate', compute_add = u'calculate_add', compute_add_divide = u'calculate_add_divide', compute_divide = u'calculate_divide', ).get(method_name, method_name) variable_name = variable.name # if variable_name.endswith(u'_holder'): # variable_name = variable_name[:-len(u'_holder')] method_subject = call_subject.subject if method_subject.guess(parser.Simulation): assert len(call.positional_arguments) >= 1, call.positional_arguments assert len(call.named_arguments) <= 1, call.named_arguments if len(call.named_arguments) == 1: assert u'accept_other_period' in call.named_arguments requested_variable = call.positional_arguments[0] if isinstance(requested_variable, parser.String): if requested_variable.value == variable_name: # @calculate(x, ...) return parser.Call( container = container, hint = call.hint, named_arguments = call.named_arguments, parser = parser, positional_arguments = [ parser.Variable( container = container, # hint = parser.ArrayHandle(parser = parser), TODO name = requested_variable.value, parser = parser, ), ] + call.positional_arguments[1:], subject = parser.Variable( # TODO: Use Macro or MacroCall. name = u'@{}'.format(method_julia_name), parser = parser, ), ) # y = calculate("x", ...) return parser.Assignment( container = container, left = [ parser.Variable(container = container, name = variable_name, parser = parser), ], operator = u'=', parser = parser, right = [ parser.Call( container = container, hint = call.hint, named_arguments = call.named_arguments, parser = parser, positional_arguments = [ method_subject, requested_variable, ] + call.positional_arguments[1:], subject = parser.Variable( # TODO: Use function call. name = method_julia_name, parser = parser, ), ), ], ) elif method_name == 'get_array': method_subject = call_subject.subject if method_subject.guess(parser.Simulation): assert len(call.positional_arguments) >= 1, call.positional_arguments assert len(call.named_arguments) == 0, call.named_arguments requested_variable = call.positional_arguments[0] if isinstance(requested_variable, parser.String): if requested_variable.value == variable.name: # @variable_at(x, ...) return parser.Call( container = container, hint = call.hint, parser = parser, positional_arguments = [ parser.Variable( container = container, # hint = parser.ArrayHandle(parser = parser), TODO name = requested_variable.value, parser = parser, ), ] + call.positional_arguments[1:] + [ # Add nothing as default value. parser.NoneWrapper( container = container, parser = parser, ), ], subject = parser.Variable( # TODO: Use Macro or MacroCall. name = u'@variable_at', parser = parser, ), ) # y = variable_at("x", ...) return parser.Assignment( container = container, left = [ parser.Variable(container = container, name = variable.name, parser = parser), ], operator = u'=', parser = parser, right = [ parser.Call( container = container, hint = call.hint, parser = parser, positional_arguments = [ method_subject, requested_variable, ] + call.positional_arguments[1:] + [ # Add nothing as default value. parser.NoneWrapper( container = container, parser = parser, ), ], subject = parser.Variable( # TODO: Use function call. name = u'variable_at', parser = parser, ), ), ], ) return self.__class__( container = container, hint = self.hint, left = left, operator = operator, parser = self.parser, right = right, ) def source_julia(self, depth = 0): left_str = u', '.join( left_item.source_julia(depth = depth + 1) for left_item in self.left ) right_str = u', '.join( right_item.source_julia(depth = depth + 1) for right_item in self.right ) return u'{} {} {}'.format(left_str, self.operator, right_str) class Attribute(JuliaCompilerMixin, formulas_parsers_2to3.Attribute): def juliaize(self): parser = self.parser subject = self.subject.juliaize() parent_node = subject.guess(parser.CompactNode) if parent_node is not None: key = self.name assert key is not None key = unicode(key) if key not in ('iterkeys', 'iteritems', 'itervalues', 'keys', 'items', 'values'): node_value = parent_node.value['children'].get(key) if node_value is None: # Dirty hack for tax_hab formula. if key == u'taux': node_value = parent_node.value['children']['taux_plein'] else: assert key in (u'taux_plein', u'taux_reduit'), key node_value = parent_node.value['children']['taux'] node_type = node_value['@type'] if node_type == u'Node': hint = parser.CompactNode( is_reference = parent_node.is_reference, name = unicode(key), parent = parent_node, parser = parser, value = node_value, ) elif node_type == u'Parameter': if node_value.get('format') == 'boolean': hint = parser.Boolean( parser = parser, ) else: hint = parser.Number( parser = parser, ) else: assert node_type == u'Scale' hint = parser.TaxScale( parser = parser, ) return parser.Key( container = self.container, hint = hint, parser = parser, subject = subject, value = parser.String( container = self.container, parser = parser, value = key, ).juliaize(), ) elif subject.guess(parser.Instant) is not None: if self.name in (u'day', u'month', u'year'): return parser.Call( container = self.container, hint = parser.Number(parser = parser), parser = parser, positional_arguments = [subject], subject = parser.Variable( name = self.name, parser = parser, ), ) elif subject.guess(parser.Period) is not None: if self.name == u'stop': return parser.Call( container = self.container, hint = parser.Instant(parser = parser), parser = parser, positional_arguments = [subject], subject = parser.Variable( name = u'stop_date', parser = parser, ), ) else: formula = subject.guess(parser.Formula) if formula is not None: if self.name == u'__class__': return parser.Variable( name = u'variable.definition', parser = parser, value = self ) elif self.name == u'holder': return parser.Variable( # hint = parser.Holder( # column = formula.column, # parser = parser, # ), name = u'variable', parser = parser, value = self ) formula_class = subject.guess(parser.FormulaClass) if formula_class is not None: if self.name == u'__name__': return self.__class__( container = self.container, hint = self.hint, name = u'name', parser = parser, subject = subject, ) holder = subject.guess(parser.Holder) if holder is not None: if self.name == u'entity': return parser.Call( container = self.container, hint = parser.Entity( entity_class = parser.tax_benefit_system.entity_class_by_key_plural[ holder.column.entity_key_plural], parser = parser, ), parser = parser, positional_arguments = [subject], subject = parser.Variable( name = u'get_entity', parser = parser, ), ) parent_node = subject.guess(parser.StemNode) if parent_node is not None: hint = parser.StemNode( is_reference = parent_node.is_reference, parent = parent_node, parser = parser, ) return self.__class__( container = self.container, hint = self.hint, name = self.name, parser = parser, subject = subject, ) def source_julia(self, depth = 0): return u"{}.{}".format( self.subject.source_julia(depth = depth), self.name, ) class Boolean(JuliaCompilerMixin, formulas_parsers_2to3.Boolean): def juliaize(self): return self def source_julia(self, depth = 0): return u'true' if self.value else u'false' class Call(JuliaCompilerMixin, formulas_parsers_2to3.Call): def guess(self, expected): guessed = super(Call, self).guess(expected) if guessed is not None: return guessed parser = self.parser if issubclass(parser.Array, expected): function = self.subject.guess(parser.Variable) if function is not None: if function.name in (u'any_person_in_entity', u'sum_person_in_entity'): variable = self.positional_arguments[0].guess(parser.Variable) if variable is None: cell_wrapper = None else: variable_name = variable.name if variable_name.endswith(u'_holder'): variable_name = variable_name[:-len(u'_holder')] tax_benefit_system = parser.tax_benefit_system column = tax_benefit_system.column_by_name[variable_name] cell_wrapper = parser.get_cell_wrapper(container = self.container, type = column.dtype) return parser.Array( cell = cell_wrapper, entity_class = parser.entity_class, parser = parser, ) elif function.name == u'entity_to_person': variable = self.positional_arguments[0].guess(parser.Variable) if variable is None: cell_wrapper = None else: variable_name = variable.name if variable_name.endswith(u'_holder'): variable_name = variable_name[:-len(u'_holder')] tax_benefit_system = parser.tax_benefit_system column = tax_benefit_system.column_by_name[variable_name] cell_wrapper = parser.get_cell_wrapper(container = self.container, type = column.dtype) return parser.Array( cell = cell_wrapper, entity_class = parser.person_class, parser = parser, ) elif function.name in (u'max', u'min'): for argument in self.positional_arguments: array = argument.guess(parser.Array) if array is not None: return parser.Array( cell = parser.Number( parser = parser, ), entity_class = array.entity_class, parser = parser, ) elif function.name == u'single_person_in_entity': variable = self.positional_arguments[0].guess(parser.Variable) if variable is None: cell_wrapper = None else: variable_name = variable.name if variable_name.endswith(u'_holder'): variable_name = variable_name[:-len(u'_holder')] tax_benefit_system = parser.tax_benefit_system column = tax_benefit_system.column_by_name[variable_name] cell_wrapper = parser.get_cell_wrapper(container = self.container, type = column.dtype) return parser.Array( cell = cell_wrapper, entity_class = parser.entity_class, parser = parser, ) elif function.name == u'zeros': assert len(self.positional_arguments) <= 2, self.positional_arguments assert len(self.named_arguments) <= 1, self.named_arguments dtype_wrapper = self.named_arguments.get('dtype') if dtype_wrapper is None: cell_type = None else: dtype_wrapper = dtype_wrapper.guess(parser.String) or dtype_wrapper.guess(parser.Type) cell_type = dtype_wrapper.value cell_wrapper = parser.get_cell_wrapper(container = self.container, type = cell_type) return parser.Array( cell = cell_wrapper, parser = parser, ) elif issubclass(parser.Boolean, expected): function = self.subject.guess(parser.Variable) if function is not None: if function.name in (u'all', u'any', u'isempty'): return parser.Boolean( parser = parser, ) elif issubclass(parser.Number, expected): function = self.subject.guess(parser.Variable) if function is not None: if function.name == 'length': return parser.Number(parser = parser) elif issubclass(parser.UniformDictionary, expected): function = self.subject.guess(parser.Variable) if function is not None: if function.name == u'split_person_by_role': variable = self.positional_arguments[0].guess(parser.Variable) if variable is None: cell_wrapper = None else: variable_name = variable.name if variable_name.endswith(u'_holder'): variable_name = variable_name[:-len(u'_holder')] tax_benefit_system = parser.tax_benefit_system column = tax_benefit_system.column_by_name[variable_name] cell_wrapper = parser.get_cell_wrapper(container = self.container, type = column.dtype) return parser.UniformDictionary( julia = True, key = parser.Role( parser = parser, ), parser = parser, value = parser.Array( cell = cell_wrapper, entity_class = parser.entity_class, parser = parser, ), ) elif issubclass(parser.UniformIterator, expected): function = self.subject.guess(parser.Variable) if function is not None: if function.name == u'keys': uniform_dictionary = self.positional_arguments[0].guess(parser.UniformDictionary) if uniform_dictionary is not None: return parser.UniformIterator( items = [uniform_dictionary.key], parser = parser, ) elif function.name == 'values': uniform_dictionary = self.positional_arguments[0].guess(parser.UniformDictionary) if uniform_dictionary is not None: return parser.UniformIterator( items = [uniform_dictionary.value], parser = parser, ) uniform_dictionary = self.guess(parser.UniformDictionary) if uniform_dictionary is not None: uniform_iterator = uniform_dictionary.guess(parser.UniformIterator) if uniform_iterator is not None: return uniform_iterator return None def juliaize(self): container = self.container parser = self.parser keyword_argument = self.keyword_argument.juliaize() if self.keyword_argument is not None else None named_arguments = collections.OrderedDict( (parser.juliaize_name(argument_name), argument_value.juliaize()) for argument_name, argument_value in self.named_arguments.iteritems() ) positional_arguments = [ argument_value.juliaize() for argument_value in self.positional_arguments ] star_argument = self.star_argument.juliaize() if self.star_argument is not None else None subject = self.subject.juliaize() if isinstance(subject, parser.Attribute): method_name = subject.name if method_name in ('all', 'any'): method_subject = subject.subject if method_subject.guess(parser.Array): return parser.Call( container = container, hint = self.hint, keyword_argument = keyword_argument, named_arguments = named_arguments, parser = parser, positional_arguments = [method_subject.testize(allow_array = True)] + positional_arguments, star_argument = star_argument, subject = parser.Variable( name = method_name, parser = parser, ), ) elif method_name == 'any_by_roles': method_subject = subject.subject if isinstance(method_subject, parser.Variable) and method_subject.name == 'self': assert len(positional_arguments) == 1, positional_arguments assert len(named_arguments) == 0, named_arguments requested_variable = positional_arguments[0] # any_person_in_entity(x, get_entity(variable), period) return parser.Call( container = container, hint = self.hint, parser = parser, positional_arguments = [ requested_variable, parser.Variable( container = container, name = u'get_entity(variable)', parser = parser, ), parser.Variable( container = container, name = u'period', parser = parser, ), ], subject = parser.Variable( # TODO: Use function call. name = u'any_person_in_entity', parser = parser, ), ) elif method_name == 'astype': assert len(positional_arguments) == 1, positional_arguments argument_string = positional_arguments[0].guess(parser.String) if argument_string is not None: if argument_string.value == u'timedelta64[M]': return parser.Call( container = container, parser = parser, positional_arguments = [ parser.Term( items = [ parser.Call( container = container, parser = parser, positional_arguments = [ subject.subject, ], subject = parser.Variable( name = u'int', parser = parser, ), ), u'*', parser.Number( parser = parser, value = 12, ), u'/', parser.Number( parser = parser, value = 365.25, ), ], parser = parser, ), ], subject = parser.Variable( name = u'floor', parser = parser, ), ) elif argument_string.value == u'timedelta64[Y]': return parser.Call( container = container, parser = parser, positional_arguments = [ parser.Term( items = [ parser.Call( container = container, parser = parser, positional_arguments = [ subject.subject, ], subject = parser.Variable( name = u'int', parser = parser, ), ), u'/', parser.Number( parser = parser, value = 365.25, ), ], parser = parser, ), ], subject = parser.Variable( name = u'floor', parser = parser, ), ) else: argument_variable = positional_arguments[0].guess(parser.Variable) if argument_variable is not None: if argument_variable.name == u'int16': return parser.Call( container = container, parser = parser, positional_arguments = [subject.subject], subject = parser.Variable( name = u'int16', parser = parser, ), ) elif method_name == 'calc': method_subject = subject.subject return parser.Call( container = container, hint = self.hint, keyword_argument = keyword_argument, named_arguments = named_arguments, parser = parser, positional_arguments = [method_subject] + positional_arguments, star_argument = star_argument, subject = parser.Variable( name = u'apply_tax_scale', parser = parser, ), ) elif method_name in ('cast_from_entity_to_role', 'cast_from_entity_to_roles'): method_subject = subject.subject if isinstance(method_subject, parser.Variable) and method_subject.name == 'self': assert len(positional_arguments) == 1, positional_arguments assert len(named_arguments) <= 1, named_arguments if len(named_arguments) == 1: assert 'role' in named_arguments or 'roles' in named_arguments roles_arguments = [ (named_arguments.get('role') or named_arguments.get('roles')), ] else: roles_arguments = [] requested_variable = positional_arguments[0] # entity_to_person(x, period, role) return parser.Call( container = container, hint = self.hint, parser = parser, positional_arguments = [ requested_variable, parser.Variable( container = container, name = u'period', parser = parser, ), ] + roles_arguments, subject = parser.Variable( # TODO: Use function call. name = u'entity_to_person', parser = parser, ), ) elif method_name == 'filter_role': method_subject = subject.subject if isinstance(method_subject, parser.Variable) and method_subject.name == 'self': assert len(positional_arguments) == 1, positional_arguments assert len(named_arguments) == 1, named_arguments assert 'role' in named_arguments requested_variable = positional_arguments[0] # single_person_in_entity(x, get_entity(variable), period, role) return parser.Call( container = container, hint = self.hint, parser = parser, positional_arguments = [ requested_variable, parser.Variable( container = container, name = u'get_entity(variable)', parser = parser, ), parser.Variable( container = container, name = u'period', parser = parser, ), named_arguments['role'], ], subject = parser.Variable( # TODO: Use function call. name = u'single_person_in_entity', parser = parser, ), ) elif method_name == 'floor': method_subject = subject.subject if isinstance(method_subject, parser.Variable) and method_subject.name == 'math': assert len(named_arguments) == 0, named_arguments return parser.Call( container = container, parser = parser, positional_arguments = positional_arguments, subject = parser.Variable( name = u'floor', parser = parser, ), ) elif method_name == 'get': method_subject = subject.subject assert len(named_arguments) == 0, named_arguments assert 1 <= len(positional_arguments) <= 2, positional_arguments return parser.Call( container = container, parser = parser, positional_arguments = [method_subject] + positional_arguments + ([ parser.Variable( name = u'nothing', parser = parser, ), ] if len(positional_arguments) < 2 else []), subject = parser.Variable( name = u'get', parser = parser, ), ) elif method_name == 'iterkeys': method_subject = subject.subject assert len(named_arguments) == 0, named_arguments assert len(positional_arguments) == 0, positional_arguments return parser.Call( container = container, parser = parser, positional_arguments = [method_subject], subject = parser.Variable( name = u'keys', parser = parser, ), ) elif method_name == 'iteritems': method_subject = subject.subject assert len(named_arguments) == 0, named_arguments assert len(positional_arguments) == 0, positional_arguments return method_subject elif method_name == 'itervalues': method_subject = subject.subject assert len(named_arguments) == 0, named_arguments assert len(positional_arguments) == 0, positional_arguments return parser.Call( container = container, parser = parser, positional_arguments = [method_subject], subject = parser.Variable( name = u'values', parser = parser, ), ) elif method_name == 'legislation_at': method_subject = subject.subject if method_subject.guess(parser.Simulation): assert len(positional_arguments) == 1, positional_arguments instant = positional_arguments[0].guess(parser.Instant) if instant is not None: assert len(named_arguments) <= 1, named_arguments reference = named_arguments.get('reference') return parser.Call( container = container, hint = parser.CompactNode( is_reference = bool(reference), parser = parser, value = parser.tax_benefit_system.legislation_json, ), named_arguments = dict(reference = reference) if reference is not None else None, parser = parser, positional_arguments = [method_subject, positional_arguments[0]], subject = parser.Variable( name = u'legislation_at', parser = parser, ), ) elif method_name == 'offset': method_subject = subject.subject if method_subject.guess(parser.Instant): assert len(positional_arguments) == 2, positional_arguments delta, unit = positional_arguments if isinstance(delta, (parser.Factor, parser.Number)) and isinstance(unit, parser.String) \ and unit.value is not None: if isinstance(delta, parser.Factor): assert delta.operator == u'-' delta = -int(delta.operand.value) else: delta = int(delta.value) return parser.ParentheticalExpression( container = container, parser = parser, value = parser.ArithmeticExpression( container = container, hint = parser.Instant(parser = parser), items = [ method_subject, u'+' if delta >= 0 else u'-', parser.Call( container = container, parser = parser, positional_arguments = [ parser.Number( parser = parser, value = abs(delta), ), ], subject = parser.Variable( name = dict( day = u'Day', month = u'Month', year = u'Year', )[unit.value], parser = parser, ), ), ], parser = parser, ), ) elif isinstance(delta, parser.String) and delta.value == 'first-of': if isinstance(unit, parser.String) and unit.value == 'month': return parser.Call( container = container, hint = parser.Instant(parser = parser), parser = parser, positional_arguments = [method_subject], subject = parser.Variable( name = u'firstdayofmonth', parser = parser, ), ) if isinstance(unit, parser.String) and unit.value == 'year': return parser.Call( container = container, hint = parser.Instant(parser = parser), parser = parser, positional_arguments = [method_subject], subject = parser.Variable( name = u'firstdayofyear', parser = parser, ), ) elif method_subject.guess(parser.Period): unit = method_subject.guess(parser.Period).unit assert len(positional_arguments) == 1, positional_arguments delta = positional_arguments[0] if isinstance(delta, (parser.Factor, parser.Number)): if isinstance(delta, parser.Factor): assert delta.operator == u'-' delta = -int(delta.operand.value) else: delta = int(delta.value) return parser.ArithmeticExpression( container = container, hint = parser.Period(parser = parser, unit = unit), items = [ method_subject, u'+' if delta >= 0 else u'-', parser.Call( container = container, parser = parser, positional_arguments = [ parser.Number( parser = parser, value = abs(delta), ), ], subject = parser.Variable( name = dict( day = u'Day', month = u'Month', year = u'Year', )[unit], parser = parser, ), ), ], parser = parser, ) elif isinstance(delta, parser.String) and delta.value == 'first-of': return parser.Call( container = container, hint = parser.Period(parser = parser, unit = unit), parser = parser, positional_arguments = [method_subject], subject = parser.Variable( name = u'first_day', parser = parser, ), ) elif method_name == 'period': method_subject = subject.subject if method_subject.guess(parser.Instant): assert len(positional_arguments) >= 1, positional_arguments unit = positional_arguments[0] if isinstance(unit, parser.String) and unit.value == 'month': return parser.Call( container = container, hint = parser.Period(parser = parser, unit = unit.value), parser = parser, positional_arguments = [method_subject] + positional_arguments[1:], subject = parser.Variable( name = u'MonthPeriod', parser = parser, ), ) if isinstance(unit, parser.String) and unit.value == 'year': return parser.Call( container = container, hint = parser.Period(parser = parser, unit = unit.value), parser = parser, positional_arguments = [method_subject] + positional_arguments[1:], subject = parser.Variable( name = u'YearPeriod', parser = parser, ), ) elif method_name == 'split_by_roles': method_subject = subject.subject if isinstance(method_subject, parser.Variable) and method_subject.name == 'self': assert len(positional_arguments) == 1, positional_arguments requested_variable = positional_arguments[0] assert len(named_arguments) <= 1, named_arguments if len(named_arguments) == 1: assert 'roles' in named_arguments roles_arguments = [named_arguments['roles']] else: roles_arguments = [] # split_person_by_role(x, get_entity(variable), period, roles) return parser.Call( container = container, hint = self.hint, parser = parser, positional_arguments = [ requested_variable, parser.Variable( container = container, name = u'get_entity(variable)', parser = parser, ), parser.Variable( container = container, name = u'period', parser = parser, ), ] + roles_arguments, subject = parser.Variable( # TODO: Use function call. name = u'split_person_by_role', parser = parser, ), ) elif method_name == 'sum_by_entity': method_subject = subject.subject if isinstance(method_subject, parser.Variable) and method_subject.name == 'self': assert len(positional_arguments) == 1, positional_arguments assert len(named_arguments) <= 1, named_arguments if len(named_arguments) == 1: roles = named_arguments.get('roles') assert roles is not None, named_arguments else: roles = None requested_variable = positional_arguments[0] # sum_person_in_entity(x, get_entity(variable), period) return parser.Call( container = container, hint = self.hint, parser = parser, positional_arguments = [ requested_variable, parser.Variable( container = container, name = u'get_entity(variable)', parser = parser, ), parser.Variable( container = container, name = u'period', parser = parser, ), ] + ([roles] if roles is not None else []), subject = parser.Variable( # TODO: Use function call. name = u'sum_person_in_entity', parser = parser, ), ) elif isinstance(subject, parser.Variable): function_name = subject.name if function_name == 'and_': assert len(positional_arguments) == 2, positional_arguments left, right = positional_arguments return parser.ParentheticalExpression( container = container, parser = parser, value = parser.AndExpression( container = container, operands = [ parser.ParentheticalExpression( container = container, parser = parser, value = left.testize(allow_array = True), ), parser.ParentheticalExpression( container = container, parser = parser, value = right.testize(allow_array = True), ), ], operator = u'&', parser = parser, ), ) elif function_name == 'around': assert len(positional_arguments) == 1, positional_arguments return parser.Call( container = container, hint = positional_arguments[0].hint, parser = parser, positional_arguments = positional_arguments, subject = parser.Variable( name = u'round', parser = parser, ), ) elif function_name == 'date': assert len(positional_arguments) == 3, positional_arguments return parser.Call( container = container, hint = parser.Date(parser = parser), parser = parser, positional_arguments = positional_arguments, subject = parser.Variable( name = u'Date', parser = parser, ), ) elif function_name == 'datetime64': assert len(positional_arguments) == 1, positional_arguments argument = positional_arguments[0] assert argument.guess(parser.Date) is not None or argument.guess(parser.Instant) is not None, argument return argument elif function_name == 'len': assert len(positional_arguments) == 1, positional_arguments return parser.Call( container = container, hint = self.hint, parser = parser, positional_arguments = positional_arguments, subject = parser.Variable( name = u'length', parser = parser, ), ) elif function_name == 'max_': return parser.Call( container = container, hint = self.hint, parser = parser, positional_arguments = positional_arguments, subject = parser.Variable( name = u'max', parser = parser, ), ) elif function_name == 'min_': return parser.Call( container = container, hint = self.hint, parser = parser, positional_arguments = positional_arguments, subject = parser.Variable( name = u'min', parser = parser, ), ) elif function_name == 'not_': assert len(positional_arguments) == 1, positional_arguments value = positional_arguments[0] return parser.NotTest( container = container, value = parser.ParentheticalExpression( container = container, parser = parser, value = value.testize(allow_array = True), ), parser = parser, ) elif function_name == 'or_': assert len(positional_arguments) == 2, positional_arguments left, right = positional_arguments return parser.ParentheticalExpression( container = container, parser = parser, value = parser.Expression( container = container, operands = [ parser.ParentheticalExpression( container = container, parser = parser, value = left.testize(allow_array = True), ), parser.ParentheticalExpression( container = container, parser = parser, value = right.testize(allow_array = True), ), ], operator = u'|', parser = parser, ), ) elif function_name == 'round_': assert 1 <= len(positional_arguments) <= 2, positional_arguments return parser.Call( container = container, hint = positional_arguments[0].hint, parser = parser, positional_arguments = positional_arguments, subject = parser.Variable( name = u'round', parser = parser, ), ) elif function_name == 'startswith': assert len(positional_arguments) == 2, positional_arguments assert len(named_arguments) == 0, named_arguments return parser.Call( container = container, hint = self.hint, parser = parser, positional_arguments = positional_arguments, subject = parser.Variable( name = u'beginswith', parser = parser, ), ) elif function_name == 'xor_': assert len(positional_arguments) == 2, positional_arguments left, right = positional_arguments return parser.ParentheticalExpression( container = container, parser = parser, value = parser.XorExpression( container = container, operands = [ parser.ParentheticalExpression( container = container, parser = parser, value = left.testize(allow_array = True), ), parser.ParentheticalExpression( container = container, parser = parser, value = right.testize(allow_array = True), ), ], operator = u'$', parser = parser, ), ) elif function_name == u'zeros': assert len(positional_arguments) == 1, positional_arguments length = positional_arguments[0].juliaize() assert len(named_arguments) <= 1, named_arguments dtype_wrapper = named_arguments.get('dtype') if dtype_wrapper is None: cell_type = None else: dtype_wrapper = dtype_wrapper.guess(parser.String) or dtype_wrapper.guess(parser.Type) cell_type = dtype_wrapper.value cell_wrapper = parser.get_cell_wrapper(container = self.container, type = cell_type) return parser.Call( container = container, parser = parser, positional_arguments = [ cell_wrapper.typeize(), length, ], subject = parser.Variable( name = u'zeros', parser = parser, ), ) return self.__class__( container = container, hint = self.hint, keyword_argument = keyword_argument, named_arguments = named_arguments, parser = parser, positional_arguments = positional_arguments, star_argument = star_argument, subject = subject, ) def source_julia(self, depth = 0): star = ([u'{}...'.format(self.star_argument.source_julia(depth = depth + 2))] if self.star_argument is not None else []) keyword = ([u'{}...'.format(self.keyword_argument.source_julia(depth = depth + 2))] if self.keyword_argument is not None else []) arguments_str = [ argument_value.source_julia(depth = depth) for argument_value in self.positional_arguments ] + star + [ u'{} = {}'.format(argument_name, argument_value.source_julia(depth = depth)) for argument_name, argument_value in self.named_arguments.iteritems() ] + keyword return u"{}({})".format( self.subject.source_julia(depth = depth), u', '.join(arguments_str), ) class Class(JuliaCompilerMixin, formulas_parsers_2to3.Class): pass class Comparison(JuliaCompilerMixin, formulas_parsers_2to3.Comparison): def juliaize(self): container = self.container parser = self.parser right = self.right.juliaize() if self.operator in (u'in', u'not in'): if right.guess(parser.CompactNode) is not None or right.guess(parser.StemNode) is not None \ or right.guess(parser.UniformDictionary) is not None: return self.__class__( container = container, hint = self.hint, left = self.left.juliaize(), operator = self.operator, parser = parser, right = parser.Call( container = container, parser = parser, positional_arguments = [right], subject = parser.Variable( name = u'keys', parser = parser, ), ), ) return self.__class__( container = container, hint = self.hint, left = self.left.juliaize(), operator = self.operator, parser = parser, right = right, ) def source_julia(self, depth = 0): operator = self.operator if operator == u'not in': return u'!({} in {})'.format(self.left.source_julia(depth = depth), self.right.source_julia(depth = depth)) if operator == u'is': operator = u'===' elif operator == u'is not': operator = u'!==' elif operator in (u'==', u'>', u'>=', u'<', u'<=', u'!=') and self.guess(self.parser.Array) is not None: operator = u'.{}'.format(operator) return u'{} {} {}'.format(self.left.source_julia(depth = depth), operator, self.right.source_julia(depth = depth)) class Continue(JuliaCompilerMixin, formulas_parsers_2to3.Continue): def juliaize(self): return self # Conversion of variable to Julia is done only once, during assignment. def source_julia(self, depth = 0): return u'continue' class Expression(JuliaCompilerMixin, formulas_parsers_2to3.Expression): def juliaize(self): return self.__class__( container = self.container, hint = self.hint, operands = [ operand.juliaize().testize(allow_array = True) for operand in self.operands ], operator = self.operator, parser = self.parser, ) def source_julia(self, depth = 0): return u' {} '.format(self.operator).join( operand.source_julia(depth = depth) for operand in self.operands ) class Factor(JuliaCompilerMixin, formulas_parsers_2to3.Factor): def juliaize(self): return self.__class__( container = self.container, hint = self.hint, operand = self.operand.juliaize(), operator = self.operator, parser = self.parser, ) def source_julia(self, depth = 0): return u'{}{}'.format(self.operator, self.operand.source_julia(depth = depth)) class For(JuliaCompilerMixin, formulas_parsers_2to3.For): def juliaize(self): parser = self.parser for variable in self.variable_by_name.itervalues(): if variable.value is not None: variable.value = variable.value.juliaize() return self.__class__( container = self.container, hint = self.hint, body = [ statement.juliaize() for statement in self.body ], iterator = self.iterator.juliaize(), parser = parser, variable_by_name = self.variable_by_name, ) def source_julia(self, depth = 0): variables_name = list(self.variable_by_name.iterkeys()) return u'for {variables} in {iterator}\n{body}{indent}end'.format( body = u''.join( u'{}{}\n'.format(u' ' * (depth + 1), statement.source_julia(depth = depth + 1)) for statement in self.body ), indent = u' ' * depth, iterator = self.iterator.source_julia(depth = depth + 1), variables = u'({})'.format(u', '.join(variables_name)) if len(variables_name) > 1 else variables_name[0], ) class Formula(JuliaCompilerMixin, formulas_parsers_2to3.Formula): def juliaize(self): # A formula is never used as is. Only its attributes are used, so it is not converted to Julia. return self class Function(JuliaCompilerMixin, formulas_parsers_2to3.Function): def juliaize(self): parser = self.parser for variable in self.variable_by_name.itervalues(): if variable.value is not None: variable.value = variable.value.juliaize() return self.__class__( container = self.container, hint = self.hint, body = [ statement.juliaize() for statement in self.body ], keyword_name = self.keyword_name, name = self.name, named_parameters = collections.OrderedDict( (name, value.juliaize()) for name, value in self.named_parameters.iteritems() ), parser = parser, positional_parameters = self.positional_parameters, returns = [ statement.juliaize() for statement in self.returns ] if self.returns else None, star_name = self.star_name, variable_by_name = self.variable_by_name, ) def source_julia(self, depth = 0): positional_parameters = [] if self.positional_parameters: positional_parameters.extend(self.positional_parameters) if self.star_name: positional_parameters.append(u'{}...'.format(self.star_name)) named_parameters = [] if self.named_parameters: named_parameters.extend( '{} = {}'.format(name, value.source_julia(depth = depth + 2)) for name, value in self.named_parameters.iteritems() ) if self.keyword_name: named_parameters.append(u'{}...'.format(self.keyword_name)) return u'\n{indent}function {name}({positional_parameters}{named_parameters})\n{body}{indent}end\n'.format( body = self.source_julia_statements(depth = depth + 1), indent = u' ' * depth, name = self.name, named_parameters = u'; {}'.format(u', '.join(named_parameters)) if named_parameters else u'', positional_parameters = u', '.join(positional_parameters), ) def source_julia_statements(self, depth = 0): parser = self.parser statements = [] for statement in self.body: if isinstance(statement, parser.String): # Strip and reindent docstring. value = statement.value.strip() if u'\n' in value: lines = value.split(u'\n') while all(index == 0 or not line or line.startswith(u' ') for index, line in enumerate(lines)): lines = [ (line[4:] if line else u'') if index > 0 else line for index, line in enumerate(lines) ] if any(line.startswith(u' ') for line in lines): continue break value = u'\n'.join( u'{}{}'.format(u' ' * depth, line) if line else u'' for line in lines ).strip() if u'\n' in value: value += u'\n{}'.format(u' ' * depth) statement.value = value statements.append(u'{}{}\n'.format(u' ' * depth, statement.source_julia(depth = depth))) return u''.join(statements) class FunctionFileInput(JuliaCompilerMixin, formulas_parsers_2to3.FunctionFileInput): @classmethod def parse(cls, function, parser = None): function_wrapper = super(FunctionFileInput, cls).parse(function, parser = parser) parser.non_formula_function_by_name[function_wrapper.name] = function_wrapper return function_wrapper class If(JuliaCompilerMixin, formulas_parsers_2to3.If): def juliaize(self): return self.__class__( container = self.container, hint = self.hint, items = [ ( test.juliaize().testize() if test is not None else None, [ statement.juliaize() for statement in body ], ) for test, body in self.items ], parser = self.parser, ) def source_julia(self, depth = 0): return u''.join( u'{word}{test}\n{body}'.format( body = u''.join( u'{}{}\n'.format(u' ' * (depth + 1), statement.source_julia(depth = depth + 1)) for statement in body ), test = u' {}'.format(test.source_julia(depth = depth + 2)) if test is not None else u'', word = (u'{}else' if test is None else u'if' if index == 0 else u'{}elseif').format(u' ' * depth), ) for index, (test, body) in enumerate(self.items) ) + u'{}end'.format(u' ' * depth) class Instant(JuliaCompilerMixin, formulas_parsers_2to3.Instant): def juliaize(self): return self class Key(JuliaCompilerMixin, formulas_parsers_2to3.Key): def juliaize(self): parser = self.parser subject = self.subject.juliaize() parent_node = subject.guess(parser.CompactNode) if parent_node is not None: value = self.value.guess(parser.String) key = value.value if value is not None else None if key is None: hint = parser.StemNode( is_reference = parent_node.is_reference, parent = parent_node, parser = parser, ) else: key = unicode(key) node_value = parent_node.value['children'].get(key) if node_value is None: # Dirty hack for tax_hab formula. if key == u'taux': node_value = parent_node.value['children']['taux_plein'] else: assert key in (u'taux_plein', u'taux_reduit'), key node_value = parent_node.value['children']['taux'] node_type = node_value['@type'] if node_type == u'Node': hint = parser.CompactNode( is_reference = parent_node.is_reference, name = unicode(key), parent = parent_node, parser = parser, value = node_value, ) elif node_type == u'Parameter': if node_value.get('format') == 'boolean': hint = parser.Boolean( parser = parser, ) else: hint = parser.Number( parser = parser, ) else: assert node_type == u'Scale' hint = parser.TaxScale( parser = parser, ) return self.__class__( container = self.container, hint = hint, parser = parser, subject = subject, value = self.value.juliaize(), ) else: parent_node = subject.guess(parser.StemNode) if parent_node is not None: hint = parser.StemNode( is_reference = parent_node.is_reference, parent = parent_node, parser = parser, ) return self.__class__( container = self.container, hint = self.hint, parser = parser, subject = subject, value = self.value.juliaize(), ) def source_julia(self, depth = 0): return u"{}[{}]".format( self.subject.source_julia(depth = depth), self.value.source_julia(depth = depth), ) class Lambda(JuliaCompilerMixin, formulas_parsers_2to3.Lambda): def juliaize(self): parser = self.parser # Convert variables to Julia only once, during assignment. variable_by_name = collections.OrderedDict( ( name, variable.__class__( container = variable.container, hint = variable.hint, name = variable.name, parser = parser, value = variable.value.juliaize() if variable.value is not None else None, ).juliaize(), ) for name, variable in self.variable_by_name.iteritems() ) return self.__class__( container = self.container, hint = self.hint, expression = self.expression.juliaize(), parser = parser, positional_parameters = self.positional_parameters, variable_by_name = variable_by_name, ) def source_julia(self, depth = 0): return u'({parameters}) -> {expression}'.format( expression = self.expression.source_julia(depth = depth + 1), parameters = u', '.join(self.positional_parameters), ) class List(JuliaCompilerMixin, formulas_parsers_2to3.List): def juliaize(self): return self.__class__( container = self.container, hint = self.hint, parser = self.parser, value = [ item.juliaize() for item in self.value ], ) def source_julia(self, depth = 0): return u'[{}]'.format(u', '.join( item.source_julia(depth = depth) for item in self.value )) class NoneWrapper(JuliaCompilerMixin, formulas_parsers_2to3.NoneWrapper): def juliaize(self): return self def source_julia(self, depth = 0): return u'nothing' class NotTest(JuliaCompilerMixin, formulas_parsers_2to3.NotTest): def juliaize(self): return self.__class__( container = self.container, hint = self.hint, parser = self.parser, value = self.value.juliaize().testize(), ) def source_julia(self, depth = 0): parser = self.parser value = self.value if isinstance(value, parser.NotTest): return value.value.source_julia(depth = depth) return u"!{}".format(value.source_julia(depth = depth)) class Number(JuliaCompilerMixin, formulas_parsers_2to3.Number): def juliaize(self): return self def source_julia(self, depth = 0): return unicode(self.value) def typeize(self): parser = self.parser return parser.Variable( name = { None: u'Float32', np.float32: u'Float32', np.int16: u'Int16', np.int32: u'Int32', }[self.type], parser = parser, ) class ParentheticalExpression(JuliaCompilerMixin, formulas_parsers_2to3.ParentheticalExpression): def juliaize(self): return self.__class__( container = self.container, hint = self.hint, parser = self.parser, value = self.value.juliaize(), ) def source_julia(self, depth = 0): parser = self.parser value = self.value if isinstance(value, (parser.NotTest, parser.Number, parser.ParentheticalExpression, parser.Variable)): return value.source_julia(depth = depth) return u"({})".format(value.source_julia(depth = depth)) class Period(JuliaCompilerMixin, formulas_parsers_2to3.Period): def juliaize(self): return self class Return(JuliaCompilerMixin, formulas_parsers_2to3.Return): def juliaize(self): return self.__class__( container = self.container, hint = self.hint, parser = self.parser, value = self.value.juliaize(), ) def source_julia(self, depth = 0): if isinstance(self.value, self.parser.Tuple): return u'return {}'.format(u', '.join( item.source_julia(depth = depth) for item in self.value.value )) return u"return {}".format(self.value.source_julia(depth = depth)) class Role(JuliaCompilerMixin, formulas_parsers_2to3.Role): def juliaize(self): return self # A role never appears in formulas => julialize is a fake one. class Simulation(JuliaCompilerMixin, formulas_parsers_2to3.Simulation): def juliaize(self): return self class String(JuliaCompilerMixin, formulas_parsers_2to3.String): def juliaize(self): return self def source_julia(self, depth = 0): return generate_string_julia_source(self.value) class TaxScale(JuliaCompilerMixin, formulas_parsers_2to3.TaxScale): def juliaize(self): return self # A tax-scale never appears in formulas => julialize is a fake one. class Term(JuliaCompilerMixin, formulas_parsers_2to3.Term): def juliaize(self): return self.__class__( container = self.container, hint = self.hint, items = [ item if item_index & 1 else item.juliaize() for item_index, item in enumerate(self.items) ], parser = self.parser, ) def source_julia(self, depth = 0): items = self.items if len(items) == 3 and items[1] == u'//': return u'div({}, {})'.format(items[0].source_julia(depth = depth), items[2].source_julia(depth = depth)) array_expression = self.guess(self.parser.Array) is not None return u' '.join( (u'.{}'.format(item) if array_expression else item) if item_index & 1 else item.source_julia(depth = depth) for item_index, item in enumerate(items) ) class Test(JuliaCompilerMixin, formulas_parsers_2to3.Test): def juliaize(self): return self.__class__( container = self.container, false_value = self.false_value.juliaize(), hint = self.hint, parser = self.parser, test = self.test.juliaize().testize(), true_value = self.true_value.juliaize(), ) def source_julia(self, depth = 0): return u'{test} ? {true_value} : {false_value}'.format( false_value = self.false_value.source_julia(depth = depth + 1), test = self.test.source_julia(depth = depth + 1), true_value = self.true_value.source_julia(depth = depth + 1), ) class Tuple(JuliaCompilerMixin, formulas_parsers_2to3.Tuple): def juliaize(self): return self.__class__( container = self.container, hint = self.hint, parser = self.parser, value = tuple( item.juliaize() for item in self.value ), ) def source_julia(self, depth = 0): return u'({})'.format(u', '.join( item.source_julia(depth = depth) for item in self.value )) class UniformDictionary(JuliaCompilerMixin, formulas_parsers_2to3.UniformDictionary): julia = False def __init__(self, container = None, julia = False, key = None, node = None, parser = None, value = None): super(UniformDictionary, self).__init__(container = container, key = key, node = node, parser = parser, value = value) assert isinstance(julia, bool) if julia: self.julia = julia def guess(self, expected): if self.julia: # When iterating on a Julia dictionary, the iterator is a (key, value) couple, not a key only (as in # Python). parser = self.parser if issubclass(parser.UniformIterator, expected): return parser.UniformIterator( items = [ self.key, self.value, ], parser = parser, ) return super(UniformDictionary, self).guess(expected) class Variable(JuliaCompilerMixin, formulas_parsers_2to3.Variable): def juliaize(self): # Cloning variable and doing value = self.value.juliaize() may create an infinite loop (for expressions like # period = period). So instead of juliazing value, we reuse the value already juliaized during variable # assignement. # self.value = self.container.variable_by_name[self.name] return self # return self.__class__( # container = self.container, # hint = self.hint, # name = self.name, # parser = self.parser, # # Doing value = self.value.juliaize() may create an infinite loop (for expressions like period = period) # # So instead of juliazing value, we reuse the value already juliaized during variable assignement. # value = self.container.variable_by_name[self.name], # ).juliaize() def source_julia(self, depth = 0): return self.parser.juliaize_name(self.name) class XorExpression(JuliaCompilerMixin, formulas_parsers_2to3.XorExpression): def juliaize(self): return self.__class__( container = self.container, hint = self.hint, operands = [ operand.juliaize().testize(allow_array = True) for operand in self.operands ], operator = self.operator, parser = self.parser, ) def source_julia(self, depth = 0): return u' {} '.format(self.operator).join( operand.source_julia(depth = depth) for operand in self.operands ) # Formula-specific classes class FormulaClass(Class, formulas_parsers_2to3.FormulaClass): def juliaize(self): return self def source_julia(self, depth = 0): parser = self.parser if parser.column.name == 'age': return textwrap.dedent(u""" {call} do simulation, variable, period has_birth = !isempty(get_variable!(simulation, "birth").array) if !has_birth has_age_en_mois = !isemtpy(get_variable!(simulation, "age_en_mois").array_by_period) if has_age_en_mois return period, div(calculate(simulation, "age_en_mois", period), 12) end if !isempty(variable.array_by_period) for (last_period, last_array) in sort(collect(variable.array_by_period), rev = true) last_start = last_period.start if day(last_start) == day(start) return period, last_array .+ int(year(start) - year(last_start) + (month(start) - month(last_start)) / 12) end end end @calculate(birth, period) return period, Int[ year(period.start) - year(birth_cell) for birth_cell in calculate(simulation, "birth", period) ] end """).format( call = parser.source_julia_column_without_function(is_formula = True), ) if parser.column.name == 'age_en_mois': return textwrap.dedent(u""" {call} do simulation, variable, period if !isempty(variable.array_by_period) for (last_period, last_array) in sort(collect(variable.array_by_period), rev = true) last_start = last_period.start if day(last_start) == day(start) return period, last_array .+ ((year(start) - year(last_start)) * 12 + (month(start) - month(last_start))) end end has_birth = !isempty(get_variable!(simulation, "birth").array) if !has_birth has_age = !isemtpy(get_variable!(simulation, "age").array_by_period) if has_age return period, calculate(simulation, "age", period) * 12 end end @calculate(birth, period) return period, Int[ (year(period.start) - year(birth_cell)) * 12 + month(period.start) - month(birth_cell) for birth_cell in calculate(simulation, "birth", period) ] end """).format( call = parser.source_julia_column_without_function(is_formula = True), ) if parser.column.name == 'cmu_c_plafond': return textwrap.dedent(u""" {call} do simulation, variable, period period = MonthPeriod(firstdayofmonth(period.start)) @calculate(age, period) @calculate(alt, period) @calculate(cmu_eligible_majoration_dom, period) # @calculate(cmu_nbp_foyer, period) P = legislation_at(simulation, period.start)["cmu"] PAC = vcat([PART], ENFS) # Calcul du coefficient personnes à charge, avec prise en compte de la garde alternée # Tableau des coefficients coefficients_array = vcat( [P["coeff_p2"], P["coeff_p3_p4"], P["coeff_p3_p4"], P["coeff_p5_plus"]], zeros(length(PAC) - 4), ) # Tri des personnes à charge, le conjoint en premier, les enfants par âge décroissant age_by_role = split_person_by_role(age, get_entity(variable), period, PAC) alt_by_role = split_person_by_role(alt, get_entity(variable), period, PAC) age_and_alt_matrix = hcat([ (role == PART) * 10000 .+ age_by_role[role] .* 10 .+ alt_by_role[role] .- (age_by_role[role] .< 0) .* 999999 for role in sort(collect(keys(age_by_role))) ]...) for row_index in 1:size(age_and_alt_matrix, 1) age_and_alt_matrix[row_index, :] = sort(age_and_alt_matrix[row_index, :], 2, rev = true) end # Calcule weighted_alt_matrix, qui vaut 0.5 pour les enfants en garde alternée, 1 sinon. present_matrix = age_and_alt_matrix .>= 0 alt_matrix = (age_and_alt_matrix .% 10) .* present_matrix weighted_alt_matrix = present_matrix .- alt_matrix .* 0.5 # Calcul final du coefficient coeff_pac = weighted_alt_matrix * coefficients_array return period, P["plafond_base"] .* (1 .+ cmu_eligible_majoration_dom .* P["majoration_dom"]) .* (1 .+ coeff_pac) end """).format( call = parser.source_julia_column_without_function(is_formula = True), ) if parser.column.name == 'nombre_jours_calendaires': return textwrap.dedent(u""" {call} do simulation, variable, period period = MonthPeriod(firstdayofmonth(period.start)) @calculate(contrat_de_travail_arrivee, period) @calculate(contrat_de_travail_depart, period) debut_mois = firstdayofmonth(period.start) fin_mois = lastdayofmonth(period.start) jours_travailles = max( int(min(contrat_de_travail_depart, fin_mois)) .- int(max(contrat_de_travail_arrivee, debut_mois)) .+ 1, 0, ) return period, jours_travailles end """).format( call = parser.source_julia_column_without_function(is_formula = True), ) if parser.column.name == 'zone_apl': del parser.non_formula_function_by_name['preload_zone_apl'] return textwrap.dedent(u""" {call} do simulation, variable, period @calculate(depcom, period) return period, [ get(zone_apl_by_depcom, depcom_cell, 2) for depcom_cell in depcom ] end zone_apl_by_depcom = nothing function preload_zone_apl() global zone_apl_by_depcom if zone_apl_by_depcom === nothing module_dir = Pkg.dir("OpenFiscaFrance") array = readcsv(joinpath(module_dir, "assets/apl/20110914_zonage.csv"), String) zone_apl_by_depcom = [ # Keep only first char of Zonage column because of 1bis value considered equivalent to 1. depcom => Convertible(string(zone_apl_string[1])) |> input_to_int |> to_value for (depcom, zone_apl_string) in zip(array[2:end, 1], array[2:end, 5]) ] commune_depcom_by_subcommune_depcom = JSON.parsefile( joinpath(module_dir, "assets/apl/commune_depcom_by_subcommune_depcom.json")) for (subcommune_depcom, commune_depcom) in commune_depcom_by_subcommune_depcom zone_apl_by_depcom[subcommune_depcom] = zone_apl_by_depcom[commune_depcom] end end end """).format( call = parser.source_julia_column_without_function(is_formula = True), ) statements = None for variable in self.variable_by_name.itervalues(): if isinstance(variable.value, parser.FormulaFunction): # Simple formula statements = variable.value.juliaize().source_julia_statements(depth = depth + 1) break else: # Dated formula dated_functions_decorator = [ variable.value for variable in self.variable_by_name.itervalues() if isinstance(variable.value, parser.Decorator) and variable.name == 'dated_function' ] assert dated_functions_decorator statements_blocks = [] for decorator in dated_functions_decorator: call = decorator.subject assert isinstance(call, parser.Call) assert call.keyword_argument is None assert call.star_argument is None start_date = call.positional_arguments[0] \ if len(call.positional_arguments) >= 1 \ else call.named_arguments.get('start') stop_date = call.positional_arguments[1] \ if len(call.positional_arguments) >= 2 \ else call.named_arguments.get('stop') assert start_date is not None or stop_date is not None if start_date is None: test = u'{optional_else}if period.start <= {stop_date}'.format( optional_else = u'else' if statements_blocks else u'', stop_date = stop_date.juliaize().source_julia(depth = depth + 2), ) elif stop_date is None: test = u'{optional_else}if {start_date} <= period.start'.format( optional_else = u'else' if statements_blocks else u'', start_date = start_date.juliaize().source_julia(depth = depth + 2), ) else: test = u'{optional_else}if {start_date} <= period.start && period.start <= {stop_date}'.format( optional_else = u'else' if statements_blocks else u'', start_date = start_date.juliaize().source_julia(depth = depth + 2), stop_date = stop_date.juliaize().source_julia(depth = depth + 2), ) function = decorator.decorated assert isinstance(function, parser.FormulaFunction) statements_blocks.append(u"{indent} {test}\n{statements}".format( indent = u' ' * depth, statements = function.juliaize().source_julia_statements(depth = depth + 2), test = test, )) statements_blocks.append(textwrap.dedent(u"""\ {indent} else {indent} return period, default_array(variable) {indent} end """).format( indent = u' ' * depth, )) statements = u''.join(statements_blocks) return textwrap.dedent(u""" {call} do simulation, variable, period {statements}end """).format( call = parser.source_julia_column_without_function(is_formula = True), statements = statements or u'', ) class FormulaFunction(Function, formulas_parsers_2to3.FormulaFunction): pass # Julia parser & compiler class Parser(formulas_parsers_2to3.Parser): AndExpression = AndExpression AndTest = AndTest ArithmeticExpression = ArithmeticExpression Array = Array Assert = Assert Assignment = Assignment Attribute = Attribute Boolean = Boolean Call = Call Class = Class Comparison = Comparison Continue = Continue Expression = Expression Factor = Factor For = For Formula = Formula FormulaClass = FormulaClass FormulaFunction = FormulaFunction Function = Function FunctionFileInput = FunctionFileInput If = If Instant = Instant Key = Key Lambda = Lambda List = List non_formula_function_by_name = None NoneWrapper = NoneWrapper NotTest = NotTest Number = Number ParentheticalExpression = ParentheticalExpression Period = Period Return = Return Role = Role Simulation = Simulation String = String TaxScale = TaxScale Term = Term Test = Test Tuple = Tuple UniformDictionary = UniformDictionary Variable = Variable XorExpression = XorExpression def __init__(self, country_package = None, driver = None, tax_benefit_system = None): super(Parser, self).__init__(country_package = country_package, driver = driver, tax_benefit_system = tax_benefit_system) self.non_formula_function_by_name = collections.OrderedDict() def juliaize_name(self, name): if name == u'function': name = u'func' # elif name.endswith(u'_holder'): # name = name[:-len(u'_holder')] return name def source_julia_column_without_function(self, is_formula = False): column = self.column tax_benefit_system = self.tax_benefit_system column_attributes_name = set(column.__dict__.keys()) unexpected_attributes_name = column_attributes_name.difference(( 'cerfa_field', 'default', 'dtype', 'end', 'entity', 'entity_key_plural', 'enum', 'formula_class', 'is_period_size_independent', 'is_permanent', 'label', 'law_reference', 'max_length', 'name', 'start', 'url', 'val_type', )) assert not unexpected_attributes_name, "Unexpected attributes in column {}: {}".format(column.name, ", ".join(sorted(unexpected_attributes_name))) cell_type = { bool: 'Bool', float: 'Float32', np.float32: 'Float32', np.int16: 'Int16', np.int32: 'Int32', object: 'UTF8String', 'datetime64[D]': 'Date', '|S5': 'UTF8String', # TODO }.get(column.dtype) assert cell_type is not None, "Unexpected dtype in column {}: {}".format(column.name, column.dtype) cerfa_field = column.cerfa_field if isinstance(cerfa_field, basestring): cerfa_field_str = u'"{}"'.format(cerfa_field) elif isinstance(cerfa_field, dict): cerfa_field_str = u'[{}]'.format(u', '.join( u'{} => "{}"'.format(role, field) for role, field in sorted(cerfa_field.iteritems()) )) else: assert cerfa_field is None cerfa_field_str = None if column.name in ( entity.role_for_person_variable_name for entity in tax_benefit_system.entity_class_by_key_plural.itervalues() ): increment_role = True else: increment_role = False default = column.__dict__.get('default') if default is None: if increment_role: default_str = '1' else: default_str = None elif default is True: default_str = u"true" elif isinstance(default, datetime.date): default_str = u"Date({}, {}, {})".format(default.year, default.month, default.day) elif isinstance(default, (float, int)): default_str = str(default) elif default == '': default_str = '""' else: assert default is None, "Unexpected default value: {} (type: {})".format(default, type(default)) enum = column.__dict__.get('enum') if enum is None: values_str = None else: values_str = u"[\n{} ]".format(u''.join( u' "{}" => {},\n'.format(symbol, index + (1 if increment_role else 0)) for index, symbol in sorted( (index1, symbol1) for symbol1, index1 in enum ) )) law_reference = column.law_reference if isinstance(law_reference, basestring): law_reference_str = u'"{}"'.format(law_reference) elif isinstance(law_reference, list): law_reference_str = u'[{}]'.format(u', '.join( u'"{}"'.format(field) for field in law_reference )) else: assert law_reference is None law_reference_str = None # max_length = column.__dict__.get('max_length') TODO? start_date = column.start stop_date = column.end if column.name in ('age', 'age_en_mois'): assert default_str is None, default_str default_str = u"-9999" value_at_period_to_cell = textwrap.dedent(u"""\ variable_definition::VariableDefinition -> pipe( value_at_period_to_integer(variable_definition), first_match( test_greater_or_equal(0), test_equal(-9999), ), ) """).strip().replace(u'\n', u'\n ') elif column.name == 'depcom': value_at_period_to_cell = textwrap.dedent(u"""\ variable_definition::VariableDefinition -> pipe( condition( test_isa(Integer), call(string), test_isa(String), noop, fail(error = N_("Unexpected type for Insee depcom.")), ), condition( test(value -> length(value) == 4), call(value -> string('0', value)), ), test(value -> ismatch(r"^(\d{2}|2A|2B)\d{3}$", value), error = N_("Invalid Insee depcom format for commune.")), ) """).strip().replace(u'\n', u'\n ') else: value_at_period_to_cell = None formula_class = column.formula_class if issubclass(formula_class, formulas.AbstractEntityToEntity): base_function_str = u'entity_to_entity_period_value' elif formula_class.base_function.func_name in ( formulas.last_duration_last_value.func_name, formulas.missing_value.func_name, formulas.permanent_default_value.func_name, formulas.requested_period_default_value.func_name, formulas.requested_period_last_value.func_name, ): base_function_str = formula_class.base_function.func_name else: assert False, u"Unhandled base function in formula {}: {}".format(column.name, getattr(formula_class, 'base_function', None)).encode('utf-8') named_arguments = u''.join( u' {},\n'.format(named_argument) for named_argument in [ u'cell_default = {}'.format(default_str) if default_str is not None else None, u'cell_format = "{}"'.format(column.val_type) if column.val_type is not None else None, u'cerfa_field = {}'.format(cerfa_field_str) if cerfa_field_str is not None else None, (u"label = {}".format(generate_string_julia_source(column.label)) if column.label not in (u'', column.name) else None), u'law_reference = {}'.format(law_reference_str) if law_reference_str is not None else None, # u'max_length = {}'.format(max_length) if max_length is not None else None, TODO? u"permanent = true" if column.is_permanent else None, u"start_date = Date({}, {}, {})".format(start_date.year, start_date.month, start_date.day) if start_date is not None else None, u"stop_date = Date({}, {}, {})".format(stop_date.year, stop_date.month, stop_date.day) if stop_date is not None else None, (u"url = {}".format(generate_string_julia_source(column.url)) if column.url is not None else None), (u"value_at_period_to_cell = {}".format(value_at_period_to_cell) if value_at_period_to_cell is not None else None), u"values = {}".format(values_str) if values_str is not None else None, ] if named_argument is not None ) if named_arguments: named_arguments = u',\n{}'.format(named_arguments) return u"@define_{define_type}({name}, {entity}_definition, {cell_type}, {base_formula}{named_arguments})" \ .format( base_formula = base_function_str, cell_type = cell_type, define_type = u'formula' if is_formula else u'variable', entity = tax_benefit_system.entity_class_by_key_plural[column.entity_key_plural].key_singular, name = column.name, named_arguments = named_arguments, ) def generate_date_range_value_julia_source(date_range_value_json): for key in date_range_value_json.iterkeys(): assert key in ( 'comment', 'start', 'stop', 'value', ), "Unexpected item key for date range value: {}".format(key) comment = date_range_value_json.get('comment') return u'DateRangeValue({start_date}, {stop_date}, {value}{comment})'.format( comment = u', comment = {}'.format(generate_string_julia_source(comment)) if comment else u'', start_date = u'Date({}, {}, {})'.format(*date_range_value_json['start'].split(u'-')), stop_date = u'Date({}, {}, {})'.format(*date_range_value_json['stop'].split(u'-')), value = unicode(date_range_value_json['value']).lower(), # Method lower() is used for True and False. ) def generate_legislation_node_julia_source(node_json, check_start_date_julia_source = None, check_stop_date_julia_source = None, comments = None, descriptions = None, julia_source_by_path = None, path_fragments = None): if node_json['@type'] == 'Node': for key in node_json.iterkeys(): assert key in ( '@context', '@type', 'children', 'comment', 'description', 'start', 'stop', ), "Unexpected item key for node: {}".format(key) for child_code, child_json in node_json['children'].iteritems(): generate_legislation_node_julia_source( child_json, check_start_date_julia_source = check_start_date_julia_source, check_stop_date_julia_source = check_stop_date_julia_source, comments = comments + [node_json.get('comment')], descriptions = descriptions + [node_json.get('description')], julia_source_by_path = julia_source_by_path, path_fragments = path_fragments + [child_code], ) elif node_json['@type'] == 'Parameter': for key in node_json.iterkeys(): assert key in ( '@type', 'comment', 'description', 'format', 'unit', 'values', ), "Unexpected item key for parameter: {}".format(key) format = node_json.get('format') type_str = { None: u'Float32', 'boolean': u'Bool', 'float': u'Float32', 'integer': u'Int32', 'rate': u'Float32', }[format] named_arguments = collections.OrderedDict() unit = node_json.get('unit') if unit is not None: named_arguments['unit'] = u'"{}"'.format(unit) named_arguments['check_start_date'] = check_start_date_julia_source named_arguments['check_stop_date'] = check_stop_date_julia_source description = u' ; '.join( fragment for fragment in descriptions + [node_json.get('description')] if fragment ) if description: named_arguments['description'] = generate_string_julia_source(description) comment = u' ; '.join( fragment for fragment in comments + [node_json.get('comment')] if fragment ) if comment: named_arguments['comment'] = generate_string_julia_source(comment) julia_source_by_path[u'.'.join(path_fragments)] = textwrap.dedent(u""" @define_parameter({name}, Parameter{{{type}}}( [ {values} ], {named_arguments})) """).format( name = u'.'.join(path_fragments), named_arguments = u''.join( u' {} = {},\n'.format(argument_name, argument_str) for argument_name, argument_str in named_arguments.iteritems() ), type = type_str, values = u''.join( u' {},\n'.format(generate_date_range_value_julia_source(date_range_value_json)) for date_range_value_json in reversed(node_json['values']) ), ) elif node_json['@type'] == 'Scale': for key in node_json.iterkeys(): assert key in ( '@type', 'brackets', 'comment', 'description', 'option', 'unit', ), "Unexpected item key for tax scale: {}".format(key) tax_scale_type = None brackets_julia_source = [] for bracket_json in node_json['brackets']: for bracket_key in bracket_json.iterkeys(): assert bracket_key in ( 'amount', 'base', 'rate', 'threshold', ), "Unexpected item key for bracket: {}".format(bracket_key) amount_json = bracket_json.get('amount') if amount_json is None: amount_julia_source = u'' else: if tax_scale_type is None: tax_scale_type = u'AmountScale' else: assert tax_scale_type == u'AmountScale' date_range_values_julia_source = u''.join( u' {},\n'.format(generate_date_range_value_julia_source(date_range_value_json)) for date_range_value_json in reversed(amount_json) ) amount_julia_source = u' amount = [\n{} ],\n'.format(date_range_values_julia_source) base_json = bracket_json.get('base') if base_json is None: base_julia_source = u'' else: date_range_values_julia_source = u''.join( u' {},\n'.format(generate_date_range_value_julia_source(date_range_value_json)) for date_range_value_json in reversed(base_json) ) base_julia_source = u' base = [\n{} ],\n'.format(date_range_values_julia_source) rate_json = bracket_json.get('rate') if rate_json is None: rate_julia_source = u'' else: if tax_scale_type is None: tax_scale_type = u'MarginalRateScale' else: assert tax_scale_type == u'MarginalRateScale' date_range_values_julia_source = u''.join( u' {},\n'.format(generate_date_range_value_julia_source(date_range_value_json)) for date_range_value_json in reversed(rate_json) ) rate_julia_source = u' rate = [\n{} ],\n'.format(date_range_values_julia_source) threshold_json = bracket_json.get('threshold') if threshold_json is None: threshold_julia_source = u'' else: date_range_values_julia_source = u''.join( u' {},\n'.format(generate_date_range_value_julia_source(date_range_value_json)) for date_range_value_json in reversed(threshold_json) ) threshold_julia_source = u' threshold = [\n{} ],\n'.format(date_range_values_julia_source) if tax_scale_type == 'AmountScale': bracket_julia_source = u' AmountBracket(\n{threshold}{amount}{base} ),\n'.format( amount = amount_julia_source, base = base_julia_source, threshold = threshold_julia_source, ) else: bracket_julia_source = u' RateBracket(\n{threshold}{rate}{base} ),\n'.format( base = base_julia_source, rate = rate_julia_source, threshold = threshold_julia_source, ) brackets_julia_source.append(bracket_julia_source) option = node_json.get('option') assert option in ( None, 'contrib', 'main-d-oeuvre', 'noncontrib', ), "Unexpected option for tax scale: {}".format(option) # TODO named_arguments = collections.OrderedDict() unit = node_json.get('unit') if unit is not None: named_arguments['unit'] = u'"{}"'.format(unit) named_arguments['check_start_date'] = check_start_date_julia_source named_arguments['check_stop_date'] = check_stop_date_julia_source description = u' ; '.join( fragment for fragment in descriptions + [node_json.get('description')] if fragment ) if description: named_arguments['description'] = generate_string_julia_source(description) comment = u' ; '.join( fragment for fragment in comments + [node_json.get('comment')] if fragment ) if comment: named_arguments['comment'] = generate_string_julia_source(comment) julia_source_by_path[u'.'.join(path_fragments)] = textwrap.dedent(u""" @define_parameter({name}, {tax_scale_type}( [ {brackets} ], {named_arguments})) """).format( brackets = u''.join(brackets_julia_source), name = u'.'.join(path_fragments), named_arguments = u''.join( u' {} = {},\n'.format(argument_name, argument_str) for argument_name, argument_str in named_arguments.iteritems() ), tax_scale_type = tax_scale_type, ) else: assert False, "Unexpected type for node: {}".format(node_json['@type']) def generate_string_julia_source(s): if u'\n' in s: return u'"""{}"""'.format(s.replace(u'"', u'\\"')) return u'"{}"'.format(s.replace(u'"', u'\\"')) def main(): parser = argparse.ArgumentParser(description = __doc__) parser.add_argument('julia_package_dir', help = u'path of the directory of the OpenFisca Julia package') parser.add_argument('-c', '--country-package', default = 'openfisca_france', help = u'name of the OpenFisca package to use for country-specific variables & formulas') parser.add_argument('-f', '--formula', help = u'name of the OpenFisca variable to convert (all are converted by default)') parser.add_argument('-v', '--verbose', action = 'store_true', default = False, help = "increase output verbosity") args = parser.parse_args() logging.basicConfig(level = logging.DEBUG if args.verbose else logging.WARNING, stream = sys.stdout) country_package = importlib.import_module(args.country_package) TaxBenefitSystem = country_package.init_country() tax_benefit_system = TaxBenefitSystem() parser = Parser( country_package = country_package, driver = lib2to3.pgen2.driver.Driver(lib2to3.pygram.python_grammar, convert = lib2to3.pytree.convert, logger = log), tax_benefit_system = tax_benefit_system, ) legislation_json = tax_benefit_system.legislation_json parameter_julia_source_by_path = collections.OrderedDict() generate_legislation_node_julia_source( legislation_json, check_start_date_julia_source = u'Date({}, {}, {})'.format(*legislation_json['start'].split(u'-')), check_stop_date_julia_source = u'Date({}, {}, {})'.format(*legislation_json['stop'].split(u'-')), comments = [], descriptions = [], julia_source_by_path = parameter_julia_source_by_path, path_fragments = [], ) julia_path = os.path.join(args.julia_package_dir, 'src', 'parameters.jl') with codecs.open(julia_path, 'w', encoding = 'utf-8') as julia_file: julia_file.write(julia_file_header) julia_file.write(u'\n') for parameter_julia_source in parameter_julia_source_by_path.itervalues(): julia_file.write(parameter_julia_source) input_variable_definition_julia_source_by_name = collections.OrderedDict() julia_source_by_name_by_module_name = {} if args.formula: columns = [tax_benefit_system.column_by_name[args.formula]] else: columns = tax_benefit_system.column_by_name.itervalues() for column in columns: print column.name parser.column = column column_formula_class = column.formula_class assert column_formula_class is not None if issubclass(column_formula_class, formulas.SimpleFormula) and column_formula_class.function is None: # Input variable input_variable_definition_julia_source_by_name[column.name] = parser.source_julia_column_without_function() continue if issubclass(column_formula_class, formulas.AbstractEntityToEntity): # EntityToPerson or PersonToEntity converters if issubclass(column_formula_class, formulas.PersonToEntity): entity = tax_benefit_system.entity_class_by_key_plural[column.entity_key_plural] if column_formula_class.operation is None: role = column_formula_class.roles[0] # print entity.key_singular, role expression = u"single_person_in_entity({variable}, get_entity(variable), {role})".format( role = name_by_role_by_entity_key_singular[entity.key_singular][role], variable = column_formula_class.variable_name, ) elif column_formula_class.operation == u'add': roles = column_formula_class.roles # print entity.key_singular, roles roles = u', [{}]'.format(u', '.join( name_by_role_by_entity_key_singular[entity.key_singular][role] for role in roles )) if roles else u'' expression = u"sum_person_in_entity({variable}, get_entity(variable){roles})".format( roles = roles, variable = column_formula_class.variable_name, ) elif column_formula_class.operation == u'or': roles = column_formula_class.roles # print entity.key_singular, roles roles = u', [{}]'.format(u', '.join( name_by_role_by_entity_key_singular[entity.key_singular][role] for role in roles )) if roles else u'' expression = u"any_person_in_entity({variable}, get_entity(variable){roles})".format( roles = roles, variable = column_formula_class.variable_name, ) else: assert False, u"Unexpected operation" else: roles = column_formula_class.roles # print entity.key_singular, roles roles = u', [{}]'.format(u', '.join( name_by_role_by_entity_key_singular[entity.key_singular][role] for role in roles )) if roles else u'' expression = u"entity_to_person({variable}{roles})".format( roles = roles, variable = column_formula_class.variable_name, ) julia_source = textwrap.dedent(u""" {call} do simulation, variable, period @calculate({variable}, period, accept_other_period = true) return period, {expression} end """).format( call = parser.source_julia_column_without_function(is_formula = True), expression = expression, variable = column_formula_class.variable_name, ) module_name = inspect.getmodule(column_formula_class).__name__ assert module_name.startswith('openfisca_france.model.') module_name = module_name[len('openfisca_france.model.'):] julia_source_by_name_by_module_name.setdefault(module_name, {})[column.name] = julia_source continue if column.name in ( # 'age', # custom Julia implementation # 'age_en_mois', # custom Julia implementation # 'cmu_c_plafond', # custom Julia implementation 'coefficient_proratisation', # 'nombre_jours_calendaires', # custom Julia implementation # 'remuneration_apprenti', # 'zone_apl', # custom Julia implementation ): # Skip formulas that can't be easily converted to Julia and handle them as input variables. input_variable_definition_julia_source_by_name[column.name] = parser.source_julia_column_without_function() continue try: formula_class_wrapper = parser.FormulaClassFileInput.parse(column_formula_class, parser = parser) except: # Stop conversion of columns, but write the existing results to Julia files. traceback.print_exc() break try: julia_source = formula_class_wrapper.juliaize().source_julia(depth = 0) except: node = formula_class_wrapper.node if node is not None: print "An exception occurred When juliaizing formula {}:\n{}\n\n{}".format(column.name, repr(node), unicode(node).encode('utf-8')) raise module_name = formula_class_wrapper.containing_module.python.__name__ assert module_name.startswith('openfisca_france.model.') module_name = module_name[len('openfisca_france.model.'):] julia_source_by_name_by_module_name.setdefault(module_name, {})[column.name] = julia_source # Add non-formula functions to modules. for function_wrapper in parser.non_formula_function_by_name.itervalues(): try: julia_source = function_wrapper.juliaize().source_julia(depth = 0) except: node = function_wrapper.node if node is not None: print "An exception occurred When juliaizing function {}:\n{}\n\n{}".format(function_wrapper.name, repr(node), unicode(node).encode('utf-8')) raise module_name = function_wrapper.containing_module.python.__name__ assert module_name.startswith('openfisca_france.model.') module_name = module_name[len('openfisca_france.model.'):] julia_source_by_name_by_module_name.setdefault(module_name, {})[function_wrapper.name] = julia_source if args.formula: for module_name, julia_source_by_name in julia_source_by_name_by_module_name.iteritems(): for column_name, julia_source in sorted(julia_source_by_name.iteritems()): print(julia_source) else: julia_path = os.path.join(args.julia_package_dir, 'src', 'input_variables.jl') with codecs.open(julia_path, 'w', encoding = 'utf-8') as julia_file: julia_file.write(julia_file_header) julia_file.write(u'\n') for input_variable_definition_julia_source in input_variable_definition_julia_source_by_name.itervalues(): julia_file.write(u'\n') julia_file.write(input_variable_definition_julia_source) julia_file.write(u'\n') julia_path = os.path.join(args.julia_package_dir, 'src', 'formulas.jl') with codecs.open(julia_path, 'w', encoding = 'utf-8') as julia_file: julia_file.write(julia_file_header) julia_file.write(u'\n\n') for module_name in sorted(julia_source_by_name_by_module_name.iterkeys()): julia_file.write(u'include("formulas/{}.jl")\n'.format(module_name.replace(u'.', u'/'))) for module_name, julia_source_by_name in julia_source_by_name_by_module_name.iteritems(): julia_relative_path = os.path.join(*module_name.split('.')) + '.jl' julia_path = os.path.join(args.julia_package_dir, 'src', 'formulas', julia_relative_path) julia_dir = os.path.dirname(julia_path) if not os.path.exists(julia_dir): os.makedirs(julia_dir) with codecs.open(julia_path, 'w', encoding = 'utf-8') as julia_file: julia_file.write(julia_file_header) for column_name, julia_source in sorted(julia_source_by_name.iteritems()): julia_file.write(u'\n') julia_file.write(julia_source) return 0 if __name__ == "__main__": sys.exit(main()) PK阁F.openfisca_parsers/scripts/datatrees_to_json.py#! /usr/bin/env python # -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Convert datatrees dict to JSON.""" import argparse import json import logging import sys from openfisca_france import entities from openfisca_france.model import datatrees def main(): parser = argparse.ArgumentParser(description = __doc__) parser.add_argument('-v', '--verbose', action = 'store_true', default = False, help = "increase output verbosity") args = parser.parse_args() logging.basicConfig(level = logging.DEBUG if args.verbose else logging.WARNING, stream = sys.stdout) columns_name_tree_by_entity_name = { entities.entity_class_by_symbol[entity_symbol].key_singular: tree for entity_symbol, tree in datatrees.columns_name_tree_by_entity.iteritems() } print json.dumps(columns_name_tree_by_entity_name, indent = 2) return 0 if __name__ == "__main__": sys.exit(main()) PKW[FSd 4openfisca_parsers/scripts/extract_input_variables.py#! /usr/bin/env python # -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Extract input variables from Python formulas using lib2to3.""" import argparse import importlib import logging import os import sys from openfisca_parsers import input_variables_extractors app_name = os.path.splitext(os.path.basename(__file__))[0] log = logging.getLogger(app_name) def main(): parser = argparse.ArgumentParser(description = __doc__) parser.add_argument('-c', '--country-package', default = 'openfisca_france', help = u'name of the OpenFisca package to use for country-specific variables & formulas') parser.add_argument('-n', '--name', default = None, help = u'name of the formula to extract variables from (default: all)') parser.add_argument('-v', '--verbose', action = 'store_true', default = False, help = "increase output verbosity") args = parser.parse_args() logging.basicConfig(level = logging.DEBUG if args.verbose else logging.WARNING, stream = sys.stdout) country_package = importlib.import_module(args.country_package) TaxBenefitSystem = country_package.init_country() tax_benefit_system = TaxBenefitSystem() extractor = input_variables_extractors.setup(tax_benefit_system) if args.name is None: for column in tax_benefit_system.column_by_name.itervalues(): print column.name input_variables, parameters = extractor.get_input_variables_and_parameters(column) if input_variables is not None: print u' Input variables:', u', '.join(sorted(input_variables)) if parameters: print u' Parameters:', u', '.join(sorted(parameters)) else: column = tax_benefit_system.column_by_name[args.name] print column.name input_variables, parameters = extractor.get_input_variables_and_parameters(column) if input_variables is not None: print u' Input variables:', u', '.join(sorted(input_variables)) if parameters: print u' Parameters:', u', '.join(sorted(parameters)) return 0 if __name__ == "__main__": sys.exit(main()) PKJF_22=openfisca_parsers/scripts/formulas_parameters_to_variables.py#! /usr/bin/env python # -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Convert parameters of formulas to variables computed inside their formula.""" import argparse import codecs import copy import importlib import inspect import itertools import lib2to3.pgen2.driver # , tokenize, token import lib2to3.pygram import lib2to3.pytree import logging import os import sys from openfisca_core import conv, formulas, formulas_parsers_2to3 app_name = os.path.splitext(os.path.basename(__file__))[0] log = logging.getLogger(app_name) symbols = formulas_parsers_2to3.symbols type_symbol = formulas_parsers_2to3.type_symbol tokens = formulas_parsers_2to3.tokens def main(): parser = argparse.ArgumentParser(description = __doc__) # parser.add_argument('variable', help = u'Name of the variable to calculate. Example: "revdisp"') parser.add_argument('-c', '--country-package', default = 'openfisca_france', help = u'Name of the OpenFisca package to use for country-specific variables & formulas') parser.add_argument('-v', '--verbose', action = 'store_true', default = False, help = "increase output verbosity") parser.add_argument('-w', '--write', action = 'store_true', default = False, help = "replace content of source files") args = parser.parse_args() logging.basicConfig(level = logging.DEBUG if args.verbose else logging.WARNING, stream = sys.stdout) country_package = importlib.import_module(args.country_package) TaxBenefitSystem = country_package.init_country() tax_benefit_system = TaxBenefitSystem() state = formulas_parsers_2to3.State( driver = lib2to3.pgen2.driver.Driver(lib2to3.pygram.python_grammar, convert = lib2to3.pytree.convert, logger = log), tax_benefit_system = tax_benefit_system, ) source_lines_by_path = {} for column in tax_benefit_system.column_by_name.itervalues(): column_formula_class = column.formula_class if column_formula_class is None: # Input variable continue if issubclass(column_formula_class, formulas.AbstractEntityToEntity): # EntityToPerson or PersonToEntity converters continue # if issubclass(column_formula_class, formulas.AlternativeFormula): # continue # TODO elif issubclass(column_formula_class, formulas.DatedFormula): pass # elif issubclass(column_formula_class, formulas.SelectFormula): # continue # TODO else: assert issubclass(column_formula_class, formulas.SimpleFormula), column_formula_class print column.name state.column = column formula_class_wrapper = conv.check(state.FormulaClassFileInput.parse)(column_formula_class, state = state) if issubclass(column_formula_class, formulas.DatedFormula): function_functions = [] for name, value in formula_class_wrapper.value_by_name.iteritems(): if isinstance(value, state.Decorator) and value.name == u'dated_function': function_function = value.decorated assert isinstance(function_function, state.Function) assert name.startswith('function_') or name == 'function', name function_functions.append(function_function) else: assert issubclass(column_formula_class, formulas.SimpleFormula), column_formula_class function_functions = [formula_class_wrapper.value_by_name['function']] get_output_period_function = formula_class_wrapper.get_value_by_name('get_output_period', default = None, state = state) if get_output_period_function is None: print 'Skipping variable without get_output_period method.' continue assert get_output_period_function.node.type == symbols.funcdef suite = get_output_period_function.node.children[4] assert suite.type == symbols.suite period_text = unicode(suite).strip() assert period_text.startswith('return ') period_text = period_text.replace(u'return ', 'period = ', 1) # Remove method get_output_period from Python source file. main_source_lines, main_line_number = inspect.getsourcelines(column_formula_class) function_line_number = get_output_period_function.node.get_lineno() function_lines_count = len(unicode(get_output_period_function.node).strip().split(u'\n')) function_first_line_number = main_line_number - 1 + function_line_number function_after_line_number = function_first_line_number + function_lines_count module = inspect.getmodule(column_formula_class) source_file_path = module.__file__ if source_file_path.endswith('.pyc'): source_file_path = source_file_path[:-1] source_lines = source_lines_by_path.get(source_file_path) if source_lines is None: with codecs.open(source_file_path, "r", encoding = 'utf-8') as source_file: source_text = source_file.read() source_lines_by_path[source_file_path] = source_lines = source_text.split(u'\n') source_lines[function_first_line_number - 1:function_after_line_number - 1] = [None] * function_lines_count for function_function in function_functions: assert function_function.node.type == symbols.funcdef colon_node = function_function.node.children[3] assert colon_node.type == tokens.COLON comment = function_function.node.children[3].get_suffix() if comment: comment = comment.decode('utf-8').strip().lstrip('#').lstrip() if comment: comment = u' # ' + comment variables_name = [ parameter_name for parameter_name in function_function.positional_parameters if parameter_name not in ('_defaultP', '_P', 'period', 'self') ] variables_line = [] for variable_name in variables_name: if variable_name.endswith('_holder'): line = u"{} = simulation.compute('{}', period)".format(variable_name, variable_name[:-len('_holder')]) else: line = u"{} = simulation.calculate('{}', period)".format(variable_name, variable_name) variables_line.append(line) variables_text = u' ' + u'\n '.join(variables_line) if variables_line else u'' law_node_lines = [] if '_defaultP' in function_function.positional_parameters: law_node_lines.append( u' _defaultP = simulation.legislation_at(period.start, reference = True)\n') if '_P' in function_function.positional_parameters: law_node_lines.append(u' _P = simulation.legislation_at(period.start)\n') law_node_by_name = { name: value for name, value in function_function.named_parameters.iteritems() } law_node_by_name = copy.deepcopy(function_function.named_parameters) for parameter_name, law_node in sorted(law_node_by_name.iteritems()): law_node_path = unicode(law_node).strip() assert law_node_path.startswith(u'law') law_node_path = law_node_path[len(u'law'):] law_node_lines.append(u' {} = simulation.legislation_at(period.start){}\n'.format(parameter_name, law_node_path)) law_nodes_text = u''.join(law_node_lines) # Replace "return {array}" statements with "return period, {array}". if not function_function.returns: print "Missing return statement in {}".format(function_function.name) for return_wrapper in function_function.returns: # print "###{}###{}".format(return_wrapper.node, repr(return_wrapper.node)) return_children = return_wrapper.node.children assert len(return_children) == 2 return_value = return_children[1] del return_value.parent return_children[1] = lib2to3.pytree.Node( symbols.testlist, [ lib2to3.pytree.Leaf(tokens.NAME, 'period'), lib2to3.pytree.Leaf(tokens.COMMA, ','), return_value ], prefix = ' ', ) # print "###{}###{}".format(return_wrapper.node, repr(return_wrapper.node)) suite = function_function.node.children[4] assert suite.type == symbols.suite body_index = None doc_text = u'' for statement_index, statement in enumerate(suite.children): if statement.type in (tokens.INDENT, tokens.NEWLINE): continue if statement.type in (symbols.funcdef, symbols.if_stmt): body_index = statement_index break assert statement.type == symbols.simple_stmt, type_symbol(statement.type) if statement.children: statement_child = statement.children[0] if statement_child.type == tokens.STRING: doc_text = unicode(statement).rstrip() + u'\n ' body_index = statement_index + 1 break body_index = statement_index break assert body_index is not None body_text = unicode(u''.join( unicode(statement) for statement in itertools.islice(suite.children, body_index, None) )).strip() function_text = u"""\ def {name}(self, simulation, period):{comment} {doc}{period} {variables} {law_nodes} {body}\ """.format( body = body_text, doc = doc_text, comment = comment, law_nodes = law_nodes_text, name = function_function.name, period = period_text, variables = variables_text, ) # Replace old method with new method in Python source file. function_line_number = function_function.node.get_lineno() function_lines_count = len(unicode(function_function.node).strip().split(u'\n')) function_first_line_number = main_line_number - 1 + function_line_number function_after_line_number = function_first_line_number + function_lines_count module = inspect.getmodule(column_formula_class) source_file_path = module.__file__ if source_file_path.endswith('.pyc'): source_file_path = source_file_path[:-1] source_lines = source_lines_by_path.get(source_file_path) if source_lines is None: with codecs.open(source_file_path, "r", encoding = 'utf-8') as source_file: source_text = source_file.read() source_lines_by_path[source_file_path] = source_lines = source_text.split(u'\n') source_lines[function_first_line_number - 1:function_after_line_number - 1] = [function_text] \ + [None] * (function_lines_count - 1) if args.write: for source_file_path, source_lines in source_lines_by_path.iteritems(): with codecs.open(source_file_path, "w", encoding = 'utf-8') as source_file: source_file.write(u'\n'.join( line for line in source_lines if line is not None )) return 0 if __name__ == "__main__": sys.exit(main()) PK.F?I5050:openfisca_parsers/scripts/formulas_functions_to_classes.py#! /usr/bin/env python # -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Convert Python formulas defined as functions to new syntax using classes.""" import argparse import codecs import importlib import inspect import itertools import lib2to3.pgen2.driver # , tokenize, token import lib2to3.pygram import lib2to3.pytree import logging import os import sys import textwrap from openfisca_core import columns, conv, formulas, formulas_parsers_2to3 app_name = os.path.splitext(os.path.basename(__file__))[0] log = logging.getLogger(app_name) symbols = formulas_parsers_2to3.symbols tokens = formulas_parsers_2to3.tokens def extract_formula_function_infos(function, state = None): formula_function = conv.check(state.FormulaFunctionFileInput.parse)(function, state = state) definition_colon_node = formula_function.node.children[3] assert definition_colon_node.type == tokens.COLON comment = formula_function.node.children[3].get_suffix() if comment: comment = comment.decode('utf-8').strip().lstrip('#').lstrip() if comment: comment = u' # ' + comment parameters_text = u', '.join(itertools.chain( [u'self'], ( parameter_name for parameter_name in formula_function.positional_parameters if parameter_name != 'self' ), ( u'{} = {}'.format(parameter_name, parameter_value) for parameter_name, parameter_value in formula_function.named_parameters.iteritems() if parameter_name != 'self' ), )) suite_node = formula_function.node.children[4] assert suite_node.type == symbols.suite body_text = u' ' + '\n '.join(textwrap.dedent(unicode(suite_node).strip()).split('\n')) return dict( body = body_text, comment = comment, parameters = parameters_text, ) def main(): parser = argparse.ArgumentParser(description = __doc__) # parser.add_argument('variable', help = u'Name of the variable to calculate. Example: "revdisp"') parser.add_argument('-c', '--country-package', default = 'openfisca_france', help = u'Name of the OpenFisca package to use for country-specific variables & formulas') parser.add_argument('-v', '--verbose', action = 'store_true', default = False, help = "increase output verbosity") parser.add_argument('-w', '--write', action = 'store_true', default = False, help = "replace content of source files") args = parser.parse_args() logging.basicConfig(level = logging.DEBUG if args.verbose else logging.WARNING, stream = sys.stdout) country_package = importlib.import_module(args.country_package) TaxBenefitSystem = country_package.init_country() tax_benefit_system = TaxBenefitSystem() state = formulas_parsers_2to3.State( driver = lib2to3.pgen2.driver.Driver(lib2to3.pygram.python_grammar, convert = lib2to3.pytree.convert, logger = log), tax_benefit_system = tax_benefit_system, ) source_lines_by_path = {} for column in tax_benefit_system.column_by_name.itervalues(): column_formula_class = column.formula_class if column_formula_class is None: # Input variable continue if issubclass(column_formula_class, formulas.AbstractEntityToEntity): # EntityToPerson or PersonToEntity converters continue if issubclass(column_formula_class, formulas.AlternativeFormula): continue # TODO formula_column_class_name = u'AlternativeFormulaColumn' elif issubclass(column_formula_class, formulas.DatedFormula): formula_column_class_name = u'DatedFormulaColumn' elif issubclass(column_formula_class, formulas.SelectFormula): continue # TODO formula_column_class_name = u'SelectFormulaColumn' else: assert issubclass(column_formula_class, formulas.SimpleFormula), column_formula_class formula_column_class_name = u'SimpleFormulaColumn' try: source_lines, line_number = inspect.getsourcelines(column_formula_class) except IOError: # IOError: could not find class definition. The formula is defined using obsolete build_..._formula # function. pass else: # Skip classes build using ...FormulaColumn classes. continue print column.name state.column = column for attribute_name, attribute_value in column.__dict__.iteritems(): assert attribute_name in ( 'cerfa_field', 'default', 'end', 'entity', 'entity_key_plural', 'enum', 'formula_class', 'label', 'name', 'start', 'url', ), "Unexpected attribute {} = {} in column".format(attribute_name, attribute_value) default_text = column.default if column.default is not None else None if isinstance(column, columns.EnumCol): item_first_index = iter(column.enum).next()[1] items = u'\n'.join( u' "{}",'.format(item_key) for item_key, item_index in column.enum ) column_text = u"""{name}({default}enum = Enum( [ {items} ], {start}))""".format( default = u'default = {}, '.format(default_text) if default_text is not None else u'', items = items, name = column.__class__.__name__, start = u'start = {},\n '.format(item_first_index) if item_first_index > 0 else u'', ) elif default_text is not None: column_text = u'{name}(default = {default})'.format( default = default_text, name = column.__class__.__name__, ) else: column_text = unicode(column.__class__.__name__) label = column.label label_text = u'u"""{}"""'.format(label) if u'"' in label else u'u"{}"'.format(label) attributes = [] if column.cerfa_field is not None: TODO # Handle non-string Cerfa fields. attributes.append((u'cerfa_field', u'"{}"'.format(column.cerfa_field))) attributes.append((u'column', column_text)) attributes.append((u'entity_class', tax_benefit_system.entity_class_by_key_plural[column.entity_key_plural].__name__)) attributes.append((u'label', label_text)) if column.start is not None: attributes.append((u'start_date', u"date({}, {}, {})".format(column.start.year, column.start.month, column.start.day))) if column.end is not None: attributes.append((u'stop_date', u"date({}, {}, {})".format(column.end.year, column.end.month, column.end.day))) if column.url is not None: attributes.append((u'url', u'"{}"'.format(column.url))) formula_column_text = u"""\ @reference_formula class {name}({class_name}): {attributes}""".format( attributes = u''.join( u' {} = {}\n'.format(name, value) for name, value in attributes ), class_name = formula_column_class_name, name = column.name, ) if issubclass(column_formula_class, formulas.AlternativeFormula): TODO formulas_function = [ formula_class.function for formula_class in column_formula_class.alternative_formulas_class ] elif issubclass(column_formula_class, formulas.DatedFormula): formulas_function = [] for dated_formula_class in column_formula_class.dated_formulas_class: formula_function = dated_formula_class['formula_class'].function formula_function_infos = extract_formula_function_infos(formula_function, state = state) start_date = dated_formula_class['start_instant'].date start_triple = (start_date.year, start_date.month, start_date.day) stop_date = dated_formula_class['stop_instant'].date stop_triple = (stop_date.year, stop_date.month, stop_date.day) formula_column_text += u""" @dated_function(start = date{start_triple}, stop = date{stop_triple}) def function_{start_name}_{stop_name}({parameters}):{comment} {body} """.format( start_name = u'{:04d}{:02d}{:02d}'.format(*start_triple), start_triple = start_triple, stop_name = u'{:04d}{:02d}{:02d}'.format(*stop_triple), stop_triple = stop_triple, **formula_function_infos) formulas_function.append(formula_function) elif issubclass(column_formula_class, formulas.SelectFormula): TODO formulas_function = [ select_formula_class.function for select_formula_class in column_formula_class.formula_class_by_main_variable_name.itervalues() ] else: assert issubclass(column_formula_class, formulas.SimpleFormula), column_formula_class formula_function_infos = extract_formula_function_infos(column_formula_class.function, state = state) formula_column_text += u""" def function({parameters}):{comment} {body} """.format(**formula_function_infos) formulas_function = [column_formula_class.function] formula_column_text += u""" def get_output_period(self, period): return period.start.offset('first-of', 'month').period('year') """ if args.verbose: print formula_column_text for formula_function_index, formula_function in enumerate(formulas_function): module = inspect.getmodule(formula_function) source_file_path = module.__file__ if source_file_path.endswith('.pyc'): source_file_path = source_file_path[:-1] function_source_lines, line_number = inspect.getsourcelines(formula_function) source_lines = source_lines_by_path.get(source_file_path) if source_lines is None: with codecs.open(source_file_path, "r", encoding = 'utf-8') as source_file: source_text = source_file.read() source_lines_by_path[source_file_path] = source_lines = source_text.split(u'\n') if formula_function_index == 0: source_lines[line_number - 1:line_number - 1 + len(function_source_lines)] = [formula_column_text] \ + [None] * (len(function_source_lines) - 1) else: source_lines[line_number - 1:line_number - 1 + len(function_source_lines)] \ = [None] * len(function_source_lines) if args.write: for source_file_path, source_lines in source_lines_by_path.iteritems(): with codecs.open(source_file_path, "w", encoding = 'utf-8') as source_file: source_file.write(u'\n'.join( line for line in source_lines if line is not None )) return 0 if __name__ == "__main__": sys.exit(main()) PK ~F4΢3openfisca_parsers/scripts/decomposition_to_julia.py#! /usr/bin/env python # -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Convert XML decomposition to Julia.""" import argparse import codecs import importlib import logging import os import sys import textwrap import xml.etree from biryani.baseconv import check from openfisca_core import decompositionsxml julia_file_header = textwrap.dedent(u"""\ # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """) def transform_julia_list_tree_to_julia_source_code(node, depth = 0): indent_level = 2 return u'{depth}{node[name]} "{node[label]}" "{node[short_label]}" [{node[color]}]{children}\n'.format( children = '' if node['children'] is None else u' [\n{inner_children}{depth}]'.format( depth = ' ' * indent_level * depth, inner_children = ''.join( transform_julia_list_tree_to_julia_source_code(child_node, depth + 1) for child_node in node['children'] ), ), depth = ' ' * indent_level * depth, node = node, ) def transform_node_xml_json_to_julia_list_tree(node_xml_json): children_variables_name = map(transform_node_xml_json_to_julia_list_tree, node_xml_json['NODE']) \ if node_xml_json.get('NODE') \ else None return { 'children': children_variables_name, 'color': node_xml_json.get('color') or u'0,0,0', 'label': node_xml_json['desc'], 'name': node_xml_json['code'], 'short_label': node_xml_json['shortname'], } def xml_to_julia(tax_benefit_system, tree): xml_json = check(decompositionsxml.xml_decomposition_to_json)(tree.getroot()) xml_json = check(decompositionsxml.make_validate_node_xml_json(tax_benefit_system))(xml_json) julia_list_tree = transform_node_xml_json_to_julia_list_tree(xml_json) julia_source_code = transform_julia_list_tree_to_julia_source_code(julia_list_tree) return julia_source_code def main(): parser = argparse.ArgumentParser(description = __doc__) parser.add_argument('julia_package_dir', help = u'path of the directory of the OpenFisca Julia package') parser.add_argument('-c', '--country-package', default = 'openfisca_france', help = u'name of the OpenFisca package to use for country-specific variables & formulas') parser.add_argument('-d', '--decomposition', default = None, help = u'file path of the decomposition XML to convert') parser.add_argument('-v', '--verbose', action = 'store_true', default = False, help = "increase output verbosity") args = parser.parse_args() logging.basicConfig(level = logging.DEBUG if args.verbose else logging.WARNING, stream = sys.stdout) country_package = importlib.import_module(args.country_package) TaxBenefitSystem = country_package.init_country() tax_benefit_system = TaxBenefitSystem() xml_file_path = os.path.join(tax_benefit_system.DECOMP_DIR, tax_benefit_system.DEFAULT_DECOMP_FILE) \ if args.decomposition is None else args.decomposition tree = xml.etree.ElementTree.parse(xml_file_path) decomposition_julia = u'default_decomposition = @define_decomposition ' + xml_to_julia(tax_benefit_system, tree) julia_path = os.path.join(args.julia_package_dir, 'src', 'decompositions.jl') with codecs.open(julia_path, 'w', encoding = 'utf-8') as julia_file: julia_file.write(julia_file_header) julia_file.write(u'\n\n') julia_file.write( textwrap.dedent(u"""\ # Generated by openfisca-parsers script "{script}" # From XML decomposition "{xml_source}" in "{country_package}" # WARNING: Any manual modification may be lost. """.format( country_package = args.country_package, script = __file__, xml_source = os.path.basename(xml_file_path), )) ) julia_file.write(u'\n\n') julia_file.write(decomposition_julia) return 0 if __name__ == "__main__": sys.exit(main()) PK1ZF3openfisca_parsers/scripts/extract_variables_tree.py#! /usr/bin/env python # -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Extract and print the location of input & computed variables.""" import argparse import codecs import importlib import logging import os import pyclbr import re import sys app_name = os.path.splitext(os.path.basename(__file__))[0] build_column_re = re.compile(ur'(?ms)build_column\(\s*(?P[^,]+),') log = logging.getLogger(app_name) reference_input_variable_re = re.compile( ur'(?ms)reference_input_variable\(.+?name\s*=\s*(?P[^,)\s]+)') def main(): parser = argparse.ArgumentParser(description = __doc__) parser.add_argument('-c', '--country-package', default = 'openfisca_france', help = u'name of the OpenFisca package to use for country-specific variables & formulas') parser.add_argument('-i', '--input', action = 'store_true', default = False, help = "extract input variables") parser.add_argument('-o', '--computed', action = 'store_true', default = False, help = "extract computed variables") parser.add_argument('-v', '--verbose', action = 'store_true', default = False, help = "increase output verbosity") args = parser.parse_args() logging.basicConfig(level = logging.DEBUG if args.verbose else logging.WARNING, stream = sys.stdout) country_package = importlib.import_module(args.country_package) variables_tree = create_variables_tree(country_package, input_variables = args.input, computed_variables = args.computed) print_variables_node(variables_tree) return 0 def create_variables_tree(country_package, input_variables = False, computed_variables = False): root_dir = os.path.dirname(country_package.__file__) variables_tree = {} if input_variables: for (dir, directories_name, filenames) in os.walk(root_dir): for filename in filenames: if filename.endswith('.py'): python_file_path = os.path.join(dir, filename) with codecs.open(python_file_path, encoding = 'utf-8') as python_file: python_source = python_file.read() module_variables_name = [] for match in build_column_re.finditer(python_source): quoted_variable_name = match.group('quoted_variable_name') variable_name = quoted_variable_name[1:-1] module_variables_name.append(variable_name) for match in reference_input_variable_re.finditer(python_source): quoted_variable_name = match.group('quoted_variable_name') variable_name = quoted_variable_name[1:-1] module_variables_name.append(variable_name) if module_variables_name: module_path = [ module_name for module_name in os.path.splitext(python_file_path)[0][len(root_dir):].split(u'/') if module_name ] variables_node = variables_tree for module_name in module_path: variables_node = variables_node.setdefault('children', {}).setdefault(module_name, {}) if 'variables' in variables_node: module_variables_name += variables_node['variables'] module_variables_name.sort() variables_node['variables'] = module_variables_name if computed_variables: TaxBenefitSystem = country_package.init_country() tax_benefit_system = TaxBenefitSystem() for module_name in sys.modules.keys(): if module_name.startswith(country_package.__name__) and not module_name.endswith('.__future__'): try: class_data_by_name = pyclbr.readmodule(module_name) except ImportError: continue module_variables_name = [] for class_name, class_data in class_data_by_name.iteritems(): if class_name in tax_benefit_system.column_by_name: module_variables_name.append(class_name) if module_variables_name: module_path = [ name for name in module_name[len(country_package.__name__):].split(u'.') if name ] variables_node = variables_tree for module_name in module_path: variables_node = variables_node.setdefault('children', {}).setdefault(module_name, {}) if 'variables' in variables_node: module_variables_name += variables_node['variables'] module_variables_name.sort() variables_node['variables'] = module_variables_name return variables_tree def print_variables_node(variables_node, indent = 0): for variable_name in (variables_node.get('variables') or []): print '{}{}'.format(u' ' * indent, variable_name) for module_name, module_node in sorted((variables_node.get('children') or {}).iteritems()): print '{}* {}'.format(u' ' * indent, module_name) print_variables_node(module_node, indent = indent + 1) if __name__ == "__main__": sys.exit(main()) PK)G^- 1OpenFisca_Parsers-0.5.0.dist-info/DESCRIPTION.rstUNKNOWN PK)G[)A/OpenFisca_Parsers-0.5.0.dist-info/metadata.json{"classifiers": ["Development Status :: 2 - Pre-Alpha", "License :: OSI Approved :: GNU Affero General Public License v3", "Operating System :: POSIX", "Programming Language :: Python", "Topic :: Scientific/Engineering :: Information Analysis"], "extensions": {"python.details": {"contacts": [{"email": "contact@openfisca.fr", "name": "OpenFisca Team", "role": "author"}], "document_names": {"description": "DESCRIPTION.rst"}, "project_urls": {"Home": "https://github.com/openfisca/openfisca-parsers"}}}, "extras": [], "generator": "bdist_wheel (0.24.0)", "keywords": ["benefit", "compiler", "lexer", "microsimulation", "parser", "social", "tax"], "license": "http://www.fsf.org/licensing/licenses/agpl-3.0.html", "metadata_version": "2.0", "name": "OpenFisca-Parsers", "run_requires": [{"requires": ["Biryani[datetimeconv] (>=0.10.1)", "OpenFisca-Core (>=0.5.0)", "numpy (>=1.6)"]}], "summary": "Lexers, parsers, compilers, etc for OpenFisca source code", "version": "0.5.0"}PK)GsHL//*OpenFisca_Parsers-0.5.0.dist-info/pbr.json{"is_release": false, "git_version": "a901c33"}PK)G̮X/OpenFisca_Parsers-0.5.0.dist-info/top_level.txtopenfisca_parsers PK)G4\\'OpenFisca_Parsers-0.5.0.dist-info/WHEELWheel-Version: 1.0 Generator: bdist_wheel (0.24.0) Root-Is-Purelib: true Tag: py2-none-any PK)GXA*OpenFisca_Parsers-0.5.0.dist-info/METADATAMetadata-Version: 2.0 Name: OpenFisca-Parsers Version: 0.5.0 Summary: Lexers, parsers, compilers, etc for OpenFisca source code Home-page: https://github.com/openfisca/openfisca-parsers Author: OpenFisca Team Author-email: contact@openfisca.fr License: http://www.fsf.org/licensing/licenses/agpl-3.0.html Keywords: benefit compiler lexer microsimulation parser social tax Platform: UNKNOWN Classifier: Development Status :: 2 - Pre-Alpha Classifier: License :: OSI Approved :: GNU Affero General Public License v3 Classifier: Operating System :: POSIX Classifier: Programming Language :: Python Classifier: Topic :: Scientific/Engineering :: Information Analysis Requires-Dist: Biryani[datetimeconv] (>=0.10.1) Requires-Dist: OpenFisca-Core (>=0.5.0) Requires-Dist: numpy (>=1.6) UNKNOWN PK)GtH(OpenFisca_Parsers-0.5.0.dist-info/RECORDOpenFisca_Parsers-0.5.0.dist-info/RECORD,, OpenFisca_Parsers-0.5.0.dist-info/WHEEL,sha256=54bVun1KfEBTJ68SHUmbxNPj80VxlQ0sHi4gZdGZXEY,92 OpenFisca_Parsers-0.5.0.dist-info/DESCRIPTION.rst,sha256=OCTuuN6LcWulhHS3d5rfjdsQtW22n7HENFRh6jC6ego,10 OpenFisca_Parsers-0.5.0.dist-info/pbr.json,sha256=YtG7460cqj14dGqQqir9Bm2epf-0ACbnsDyiUvFFS-0,47 OpenFisca_Parsers-0.5.0.dist-info/METADATA,sha256=CVTWB8HMLLyf54SJg9hSG3EtWrQkpCf2hBxVbNUfDqo,791 OpenFisca_Parsers-0.5.0.dist-info/top_level.txt,sha256=YQ5dokXD8iX_hr1Oedd7uJU0XudMsf13ufg8cGTbv4M,18 OpenFisca_Parsers-0.5.0.dist-info/metadata.json,sha256=j_X0SGGW-XDpv9FNNiDgce3za0qqjt2FfL08wfSHfuY,976 openfisca_parsers/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 openfisca_parsers/formulas_parsers_2to3.py,sha256=h2ONPRNr3eyU2ZPhycMrIfVIeFuu0aDnnyzLILCH85k,140122 openfisca_parsers/input_variables_extractors.py,sha256=ezZQYOOvh6McfNf32TVTLC3cOZB92IyiXA1dJnZL1TY,5324 openfisca_parsers/scripts/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 openfisca_parsers/scripts/formulas_to_julia.py,sha256=tNFHMH15E4JgK3fvcGCkAOpFmpVpUoEbU0_H8sp9tnw,135156 openfisca_parsers/scripts/datatrees_to_json.py,sha256=DASjQWOVp5ezdiveSdkZR6-oHHVgVDzxF6irKqsE0Us,1776 openfisca_parsers/scripts/extract_input_variables.py,sha256=HfC1UCk-hQCd3CIt5feXNPEX6Et_IZgk_tECtZPJsRc,2999 openfisca_parsers/scripts/formulas_parameters_to_variables.py,sha256=QWb2gN_vtF6IKplRv560KlANaLlQ36esbvhYH5hSzew,12804 openfisca_parsers/scripts/formulas_functions_to_classes.py,sha256=hLeYsrDEr3ODzgZHAUHcCPe2ZWmf1tETixFQCoOzpOI,12341 openfisca_parsers/scripts/decomposition_to_julia.py,sha256=xMDzPg4YRPAI9BCgCNIrjbcZ0MLENlKndMgYRxq6ioQ,5818 openfisca_parsers/scripts/extract_variables_tree.py,sha256=gqi-sDWEeJdqdnXt1rv1CWIlGJLMFH8f3Shae-DBbPY,6360 PK.Fopenfisca_parsers/__init__.pyPKW[F Z#Z#*;openfisca_parsers/formulas_parsers_2to3.pyPKdFd/#openfisca_parsers/input_variables_extractors.pyPK1ZF%8openfisca_parsers/scripts/__init__.pyPK(F-U.99openfisca_parsers/scripts/formulas_to_julia.pyPK阁F.yIopenfisca_parsers/scripts/datatrees_to_json.pyPKW[FSd 4Popenfisca_parsers/scripts/extract_input_variables.pyPKJF_22=\openfisca_parsers/scripts/formulas_parameters_to_variables.pyPK.F?I5050:openfisca_parsers/scripts/formulas_functions_to_classes.pyPK ~F4΢3openfisca_parsers/scripts/decomposition_to_julia.pyPK1ZF3openfisca_parsers/scripts/extract_variables_tree.pyPK)G^- 1OpenFisca_Parsers-0.5.0.dist-info/DESCRIPTION.rstPK)G[)A/7OpenFisca_Parsers-0.5.0.dist-info/metadata.jsonPK)GsHL//*TOpenFisca_Parsers-0.5.0.dist-info/pbr.jsonPK)G̮X/OpenFisca_Parsers-0.5.0.dist-info/top_level.txtPK)G4\\'*OpenFisca_Parsers-0.5.0.dist-info/WHEELPK)GXA*OpenFisca_Parsers-0.5.0.dist-info/METADATAPK)GtH(*OpenFisca_Parsers-0.5.0.dist-info/RECORDPKv~