PKT'G.ROpenFisca_Web_API-0.5.1.data/data/share/locale/fr/LC_MESSAGES/openfisca-web-api.mo, -!;]r&!!/C&b9>%$&,0S!$-!O9e>Access DeniedAt least one scenario is requiredBad content-type: {}Bad parameters in requestFunction "{}" not found in module "{}"Invalid JSON in request POST bodyInvalid path: {0}Module "{}" not foundNo reform was declared to the APINot implementedPath not found: {0}Server is overloaded: {} {} {}There can't be more than 100 scenariosUnable to AccessYou cannot use HTTP {} to access this URL. Use one of {}.openfisca_parsers is not installed on this instance of the APIProject-Id-Version: OpenFisca-Web-API 0.5.dev0 Report-Msgid-Bugs-To: contact@openfisca.fr POT-Creation-Date: 2015-09-07 19:25+0200 PO-Revision-Date: 2015-01-19 15:24+0100 Last-Translator: Christophe Benz Language-Team: fr Plural-Forms: nplurals=2; plural=(n > 1) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Generated-By: Babel 1.3 Accès refuséAu moins un scénario est nécessaireContent-type erroné : {}Mauvais paramètres dans la requêteFunction "{}" not found in module "{}"JSON incorrect dans le corps de la requête POSTChemin incorrect : {0}Module "{}" not foundNo reform was declared to the APINot implementedChemin non trouvé : {0}Le serveur est surchargé : {} {} {}Il ne peut pas y avoir plus de 100 scénariosImpossible d'accéderYou cannot use HTTP {} to access this URL. Use one of {}.openfisca_parsers is not installed on this instance of the APIPK'G.*LOpenFisca_Web_API-0.5.1.data/data/share/openfisca/openfisca-web-api/test.ini# OpenFisca-Web-API - Development environment configuration # # The %(here)s variable will be replaced with the parent directory of this file. [DEFAULT] debug = true [app:main] use = egg:OpenFisca-Web-API country_package = openfisca_france log_level = DEBUG reforms = openfisca_france.reforms.trannoy_wasmer.build_reform # Logging configuration [loggers] keys = root, openfisca_web_api [handlers] keys = console [formatters] keys = generic [logger_root] level = INFO handlers = console [logger_openfisca_web_api] level = DEBUG handlers = qualname = openfisca_web_api [handler_console] class = StreamHandler args = (sys.stderr,) level = NOTSET formatter = generic [formatter_generic] format = %(asctime)s,%(msecs)03d %(levelname)-5.5s [%(name)s] %(message)s datefmt = %H:%M:%S PK'GZ{ZOpenFisca_Web_API-0.5.1.data/data/share/openfisca/openfisca-web-api/development-france.ini# OpenFisca-Web-API - Development environment configuration # # The %(here)s variable will be replaced with the parent directory of this file. [DEFAULT] debug = true # Uncomment and replace with the address which should receive any error reports ; email_to = you@yourdomain.com ; from_address = openfisca-web-api@localhost ; smtp_server = localhost [server:main] use = egg:Paste#http host = 0.0.0.0 port = 2000 [app:main] use = egg:OpenFisca-Web-API country_package = openfisca_france log_level = DEBUG reforms = ; openfisca_france_reform_landais_piketty_saez.build_reform ; openfisca_france_reform_revenu_de_base_cotisations.build_reform ; openfisca_france_reform_revenu_de_base_enfants.build_reform ; openfisca_france.reforms.allocations_familiales_imposables.build_reform ; openfisca_france.reforms.cesthra_invalidee.build_reform ; openfisca_france.reforms.inversion_revenus.build_reform ; openfisca_france.reforms.paris.build_reform ; openfisca_france.reforms.plf2015.build_reform ; openfisca_france.reforms.plfr2014.build_reform ; openfisca_france.reforms.trannoy_wasmer.build_reform # Logging configuration [loggers] keys = root, openfisca_web_api [handlers] keys = console [formatters] keys = generic [logger_root] level = INFO handlers = console [logger_openfisca_web_api] level = DEBUG handlers = qualname = openfisca_web_api [handler_console] class = StreamHandler args = (sys.stderr,) level = NOTSET formatter = generic [formatter_generic] format = %(asctime)s,%(msecs)03d %(levelname)-5.5s [%(name)s] %(message)s datefmt = %H:%M:%S PKFm1i[OpenFisca_Web_API-0.5.1.data/data/share/openfisca/openfisca-web-api/development-tunisia.ini# OpenFisca-Web-API - Development environment configuration # # The %(here)s variable will be replaced with the parent directory of this file. [DEFAULT] debug = true # Uncomment and replace with the address which should receive any error reports ; email_to = you@yourdomain.com ; from_address = openfisca-web-api@localhost ; smtp_server = localhost [server:main] use = egg:Paste#http host = 127.0.0.1 port = 2001 [app:main] use = egg:OpenFisca-Web-API country_package = openfisca_tunisia log_level = DEBUG # Logging configuration [loggers] keys = root, openfisca_web_api [handlers] keys = console [formatters] keys = generic [logger_root] level = INFO handlers = console [logger_openfisca_web_api] level = DEBUG handlers = qualname = openfisca_web_api [handler_console] class = StreamHandler args = (sys.stderr,) level = NOTSET formatter = generic [formatter_generic] format = %(asctime)s,%(msecs)03d %(levelname)-5.5s [%(name)s] %(message)s datefmt = %H:%M:%S PKFgexopenfisca_web_api/urls.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Helpers for URLs""" import re import urllib import urlparse from biryani import strings import webob from . import contexts, wsgihelpers application_url = None # Set to req.application_url as soon as application is called. def get_base_url(ctx, full = False): assert not(application_url is None and full), u"Can't use full URLs when application_url is not inited." base_url = (application_url or u'').rstrip('/') if not full: # When a full URL is not requested, remove scheme and network location from it. base_url = urlparse.urlsplit(base_url).path return base_url def get_full_url(ctx, *path, **query): path = [ urllib.quote(unicode(sub_fragment).encode('utf-8'), safe = ',/:').decode('utf-8') for fragment in path if fragment for sub_fragment in unicode(fragment).split(u'/') if sub_fragment ] query = dict( (str(name), strings.deep_encode(value)) for name, value in sorted(query.iteritems()) if value not in (None, [], (), '') ) return u'{0}/{1}{2}'.format(get_base_url(ctx, full = True), u'/'.join(path), ('?' + urllib.urlencode(query, doseq = True)) if query else '') def get_url(ctx, *path, **query): path = [ urllib.quote(unicode(sub_fragment).encode('utf-8'), safe = ',/:').decode('utf-8') for fragment in path if fragment for sub_fragment in unicode(fragment).split(u'/') if sub_fragment ] query = dict( (str(name), strings.deep_encode(value)) for name, value in sorted(query.iteritems()) if value not in (None, [], (), '') ) return u'{0}/{1}{2}'.format(get_base_url(ctx), u'/'.join(path), ('?' + urllib.urlencode(query, doseq = True)) if query else '') def make_router(*routings): """Return a WSGI application that dispatches requests to controllers """ routes = [] for routing in routings: methods, regex, app = routing[:3] if isinstance(methods, basestring): methods = (methods,) vars = routing[3] if len(routing) >= 4 else {} routes.append((methods, re.compile(unicode(regex)), app, vars)) def router(environ, start_response): """Dispatch request to controllers.""" req = webob.Request(environ) split_path_info = req.path_info.split('/') if split_path_info[0]: # When path_info doesn't start with a "/" this is an error or a attack => Reject request. # An example of an URL with such a invalid path_info: http://127.0.0.1http%3A//127.0.0.1%3A80/result?... ctx = contexts.Ctx(req) headers = wsgihelpers.handle_cross_origin_resource_sharing(ctx) return wsgihelpers.respond_json(ctx, dict( apiVersion = 1, error = dict( code = 400, # Bad Request message = ctx._(u"Invalid path: {0}").format(req.path_info), ), ), headers = headers, )(environ, start_response) for methods, regex, app, vars in routes: match = regex.match(req.path_info) if match is not None: if methods is not None and req.method not in methods: ctx = contexts.Ctx(req) headers = wsgihelpers.handle_cross_origin_resource_sharing(ctx) return wsgihelpers.respond_json(ctx, dict( apiVersion = 1, error = dict( code = 405, message = ctx._(u"You cannot use HTTP {} to access this URL. Use one of {}.").format( req.method, methods), ), ), headers = headers, )(environ, start_response) if getattr(req, 'urlvars', None) is None: req.urlvars = {} req.urlvars.update(match.groupdict()) req.urlvars.update(vars) req.script_name += req.path_info[:match.end()] req.path_info = req.path_info[match.end():] return app(req.environ, start_response) ctx = contexts.Ctx(req) headers = wsgihelpers.handle_cross_origin_resource_sharing(ctx) return wsgihelpers.respond_json(ctx, dict( apiVersion = 1, error = dict( code = 404, # Not Found message = ctx._(u"Path not found: {0}").format(req.path_info), ), ), headers = headers, )(environ, start_response) return router def relative_query(inputs, **query): inputs = inputs.copy() inputs.update(query) return inputs PKF&openfisca_web_api/__init__.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . conf = {} # Dictionary updated by environment.load_environment PK'G openfisca_web_api/wsgihelpers.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Decorators to wrap functions to make them WSGI applications. The main decorator :class:`wsgify` turns a function into a WSGI application. """ import collections import datetime import json from functools import update_wrapper import webob.dec import webob.exc def N_(message): return message errors_title = { 400: N_("Unable to Access"), 401: N_("Access Denied"), 403: N_("Access Denied"), 404: N_("Unable to Access"), } def wsgify(func, *args, **kwargs): result = webob.dec.wsgify(func, *args, **kwargs) update_wrapper(result, func) return result def handle_cross_origin_resource_sharing(ctx): # Cf http://www.w3.org/TR/cors/#resource-processing-model environ = ctx.req.environ headers = [] origin = environ.get('HTTP_ORIGIN') if origin is None: return headers if ctx.req.method == 'OPTIONS': method = environ.get('HTTP_ACCESS_CONTROL_REQUEST_METHOD') if method is None: return headers headers_name = environ.get('HTTP_ACCESS_CONTROL_REQUEST_HEADERS') or '' headers.append(('Access-Control-Allow-Credentials', 'true')) headers.append(('Access-Control-Allow-Origin', origin)) headers.append(('Access-Control-Max-Age', '3628800')) headers.append(('Access-Control-Allow-Methods', method)) headers.append(('Access-Control-Allow-Headers', headers_name)) raise webob.exc.status_map[204](headers = headers) # No Content headers.append(('Access-Control-Allow-Credentials', 'true')) headers.append(('Access-Control-Allow-Origin', origin)) headers.append(('Access-Control-Expose-Headers', 'WWW-Authenticate')) return headers def respond_json(ctx, data, code = None, headers = [], json_dumps_default = None, jsonp = None): """Return a JSON response. This function is optimized for JSON following `Google JSON Style Guide `_, but will handle any JSON except for HTTP errors. """ if isinstance(data, collections.Mapping): # Remove null properties as recommended by Google JSON Style Guide. data = type(data)( (name, value) for name, value in data.iteritems() if value is not None ) error = data.get('error') if isinstance(error, collections.Mapping): error = data['error'] = type(error)( (name, value) for name, value in error.iteritems() if value is not None ) else: error = None if jsonp: content_type = 'application/javascript; charset=utf-8' else: content_type = 'application/json; charset=utf-8' if error: code = code or error['code'] assert isinstance(code, int) response = webob.exc.status_map[code](headers = headers) response.content_type = content_type if code == 204: # No content return response if error.get('code') is None: error['code'] = code if error.get('message') is None: title = errors_title.get(code) title = ctx._(title) if title is not None else response.status error['message'] = title else: response = ctx.req.response response.content_type = content_type if code is not None: response.status = code response.headers.update(headers) # try: # text = json.dumps(data, encoding = 'utf-8', ensure_ascii = False, indent = 2) # except UnicodeDecodeError: # text = json.dumps(data, ensure_ascii = True, indent = 2) if json_dumps_default is None: text = json.dumps(data) else: text = json.dumps(data, default = json_dumps_default) text = unicode(text) if jsonp: text = u'{0}({1})'.format(jsonp, text) response.text = text return response def convert_date_to_json(obj): if isinstance(obj, (datetime.datetime, datetime.date)): return obj.isoformat() else: return json.JSONEncoder().default(obj) PKF]rxopenfisca_web_api/contexts.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Context loaded and saved in WSGI requests""" import gettext import os import pkg_resources import webob from . import conf, conv __all__ = ['Ctx', 'null_ctx'] class Ctx(conv.State): _parent = None default_values = dict( _lang = None, _translator = None, req = None, ) env_keys = ('_lang', '_translator') def __init__(self, req = None): if req is not None: self.req = req ctx_env = req.environ.get('openfisca-web-api', {}) for key in object.__getattribute__(self, 'env_keys'): value = ctx_env.get(key) if value is not None: setattr(self, key, value) def __getattribute__(self, name): try: return object.__getattribute__(self, name) except AttributeError: parent = object.__getattribute__(self, '_parent') if parent is None: default_values = object.__getattribute__(self, 'default_values') if name in default_values: return default_values[name] raise return getattr(parent, name) @property def _(self): return self.translator.ugettext def blank_req(self, path, environ = None, base_url = None, headers = None, POST = None, **kw): env = environ.copy() if environ else {} ctx_env = env.setdefault('openfisca-web-api', {}) for key in self.env_keys: value = getattr(self, key) if value is not None: ctx_env[key] = value return webob.Request.blank(path, environ = env, base_url = base_url, headers = headers, POST = POST, **kw) def get_containing(self, name, depth = 0): """Return the n-th (n = ``depth``) context containing attribute named ``name``.""" ctx_dict = object.__getattribute__(self, '__dict__') if name in ctx_dict: if depth <= 0: return self depth -= 1 parent = ctx_dict.get('_parent') if parent is None: return None return parent.get_containing(name, depth = depth) def get_inherited(self, name, default = UnboundLocalError, depth = 1): ctx = self.get_containing(name, depth = depth) if ctx is None: if default is UnboundLocalError: raise AttributeError('Attribute %s not found in %s' % (name, self)) return default return object.__getattribute__(ctx, name) def iter(self): yield self parent = object.__getattribute__(self, '_parent') if parent is not None: for ancestor in parent.iter(): yield ancestor def iter_containing(self, name): ctx_dict = object.__getattribute__(self, '__dict__') if name in ctx_dict: yield self parent = ctx_dict.get('_parent') if parent is not None: for ancestor in parent.iter_containing(name): yield ancestor def iter_inherited(self, name): for ctx in self.iter_containing(name): yield object.__getattribute__(ctx, name) def lang_del(self): del self._lang if self.req is not None and self.req.environ.get('openfisca-web-api') is not None \ and '_lang' in self.req.environ['openfisca-web-api']: del self.req.environ['openfisca-web-api']['_lang'] def lang_get(self): if self._lang is None: self._lang = ['fr-FR', 'fr'] if self.req is not None: self.req.environ.setdefault('openfisca-web-api', {})['_lang'] = self._lang return self._lang def lang_set(self, lang): self._lang = lang if self.req is not None: self.req.environ.setdefault('openfisca-web-api', {})['_lang'] = self._lang # Reinitialize translator for new languages. if self._translator is not None: # Don't del self._translator, because attribute _translator can be defined in a parent. self._translator = None if self.req is not None and self.req.environ.get('openfisca-web-api') is not None \ and '_translator' in self.req.environ['openfisca-web-api']: del self.req.environ['openfisca-web-api']['_translator'] lang = property(lang_get, lang_set, lang_del) def new(self, **kwargs): ctx = Ctx() ctx._parent = self for name, value in kwargs.iteritems(): setattr(ctx, name, value) return ctx @property def parent(self): return object.__getattribute__(self, '_parent') @property def translator(self): """Get a valid translator object from one or several languages names.""" if self._translator is None: languages = self.lang if not languages: return gettext.NullTranslations() if not isinstance(languages, list): languages = [languages] translator = gettext.NullTranslations() for name, i18n_dir in [ ( 'biryani', os.path.join(pkg_resources.get_distribution('biryani').location, 'biryani', 'i18n'), ), ( conf['country_package'].replace('_', '-'), os.path.join(pkg_resources.get_distribution(conf['country_package']).location, conf['country_package'], 'i18n'), ), ]: if i18n_dir is not None: translator = new_translator(name, i18n_dir, languages, fallback = translator) translator = new_translator(conf['package_name'], conf['i18n_dir'], languages, fallback = translator) self._translator = translator return self._translator null_ctx = Ctx() null_ctx.lang = ['fr-FR', 'fr'] def new_translator(domain, localedir, languages, fallback = None): new = gettext.translation(domain, localedir, fallback = True, languages = languages) if fallback is not None: new.add_fallback(fallback) return new PK'Gopenfisca_web_api/conv.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Conversion functions""" import collections from openfisca_core.conv import * # noqa def N_(message): return message # Level 1 converters ini_str_to_list = pipe( cleanup_line, function(lambda value: value.split('\n')), uniform_sequence( cleanup_line, ), ) def jsonify_value(value): if isinstance(value, dict): return collections.OrderedDict( (unicode(item_key), jsonify_value(item_value)) for item_key, item_value in value.iteritems() ) if isinstance(value, list): return [ jsonify_value(item) for item in value ] return value def make_str_to_reforms(): # Defer converter creation for model to load. from . import model def str_to_reforms(value, state = None): if value is None: return value, None if state is None: state = default_state if model.build_reform_function_by_key is None: return value, state._(u'No reform was declared to the API') declared_reforms_key = model.build_reform_function_by_key.keys() return pipe( test_isinstance(list), uniform_sequence( pipe( test_isinstance(basestring), empty_to_none, test_in(declared_reforms_key), ), drop_none_items = True, ), empty_to_none, )(value) return str_to_reforms def module_and_function_names_to_function(values, state = None): import importlib if values is None: return values, None if state is None: state = default_state module_name, function_name = values try: module = importlib.import_module(module_name) except ImportError: return values, {0: state._(u'Module "{}" not found'.format(module_name))} function = getattr(module, function_name, None) if function is None: return values, {1: state._(u'Function "{}" not found in module "{}"'.format(function_name, module))} return function, None str_to_module_and_function_names = function(lambda value: value.rsplit('.', 1)) def make_validate_variable(reforms, base_tax_benefit_system, reform_tax_benefit_system): def validate_variable(value, state = None): if value is None: return value, None if state is None: state = default_state if reforms is None: return test_in( base_tax_benefit_system.column_by_name, error = u'Variable "{}" is unknown in base tax and benefit system'.format(value), )(value) else: is_valid = value in base_tax_benefit_system.column_by_name and \ value in reform_tax_benefit_system.column_by_name return value, None if is_valid else \ u'Variable "{}" must exist in both base and reform tax and benefit system'.format(value) return validate_variable # Level 2 converters module_and_function_str_to_function = pipe( str_to_module_and_function_names, module_and_function_names_to_function, ) PK'Gs openfisca_web_api/application.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Middleware initialization""" import json import logging import sys try: import ipdb except ImportError: ipdb = None from weberror.errormiddleware import ErrorMiddleware import webob from . import conf, controllers, environment, urls log = logging.getLogger(__name__) def launch_debugger_on_exception(app): """WSGI middleware that catches all exceptions and launches a debugger.""" def _launch_debugger_on_exception(environ, start_response): try: return app(environ, start_response) except Exception as exc: log.exception(exc) e, m, tb = sys.exc_info() ipdb.post_mortem(tb) raise return _launch_debugger_on_exception def environment_setter(app): """WSGI middleware that sets request-dependant environment.""" def set_environment(environ, start_response): req = webob.Request(environ) urls.application_url = req.application_url try: return app(req.environ, start_response) except webob.exc.WSGIHTTPException as wsgi_exception: return wsgi_exception(environ, start_response) return set_environment def exception_to_json(app): """WSGI middleware that catches all exceptions and responds a JSON with the message.""" def respond_json_exception(environ, start_response): try: return app(environ, start_response) except Exception as exc: log.exception(exc) start_response('500 Internal Server Error', [('content-type', 'application/json; charset=utf-8')]) return [json.dumps({'error': u'Internal Server Error'})] return respond_json_exception def x_api_version_header_setter(app): """WSGI middleware that sets response X-API-Version header.""" def set_x_api_version_header(environ, start_response): req = webob.Request(environ) res = req.get_response(app) res.headers.update({'X-API-Version': environment.git_head_sha}) return res(environ, start_response) return set_x_api_version_header def make_app(global_conf, **app_conf): """Create a WSGI application and return it ``global_conf`` The inherited configuration for this application. Normally from the [DEFAULT] section of the Paste ini file. ``app_conf`` The application's local configuration. Normally specified in the [app:] section of the Paste ini file (where defaults to main). """ # Configure the environment and fill conf dictionary. environment.load_environment(global_conf, app_conf) # Dispatch request to controllers. app = controllers.make_router() # Init request-dependant environment app = environment_setter(app) # Set X-API-Version response header app = x_api_version_header_setter(app) # CUSTOM MIDDLEWARE HERE (filtered by error handling middlewares) # Handle Python exceptions if conf['debug'] and ipdb is not None: app = launch_debugger_on_exception(app) app = exception_to_json(app) if not conf['debug']: app = ErrorMiddleware(app, global_conf, **conf['errorware']) return app PK'G9ٖopenfisca_web_api/model.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import os import xml.etree from openfisca_core import decompositionsxml, reforms from . import conv # Declarations, initialized in environment module build_reform_function_by_key = None decomposition_json_by_file_path_cache = {} input_variables_and_parameters_by_column_name_cache = {} input_variables_extractor = None parameters_file_path = None parameters_json_cache = None reform_by_full_key = None tax_benefit_system = None TaxBenefitSystem = None def get_cached_composed_reform(reform_keys, tax_benefit_system): full_key = '.'.join( [tax_benefit_system.full_key] + reform_keys if isinstance(tax_benefit_system, reforms.AbstractReform) else reform_keys ) composed_reform = reform_by_full_key.get(full_key) if composed_reform is None: build_reform_functions = [build_reform_function_by_key[reform_key] for reform_key in reform_keys] composed_reform = reforms.compose_reforms( build_functions_and_keys = zip(build_reform_functions, reform_keys), tax_benefit_system = tax_benefit_system, ) assert full_key == composed_reform.full_key reform_by_full_key[full_key] = composed_reform return composed_reform def get_cached_or_new_decomposition_json(tax_benefit_system, xml_file_name = None): if xml_file_name is None: xml_file_name = tax_benefit_system.DEFAULT_DECOMP_FILE global decomposition_json_by_file_path_cache decomposition_json = decomposition_json_by_file_path_cache.get(xml_file_name) if decomposition_json is None: xml_file_path = os.path.join(tax_benefit_system.DECOMP_DIR, xml_file_name) decomposition_tree = xml.etree.ElementTree.parse(xml_file_path) decomposition_xml_json = conv.check(decompositionsxml.xml_decomposition_to_json)( decomposition_tree.getroot()) decomposition_xml_json = conv.check(decompositionsxml.make_validate_node_xml_json(tax_benefit_system))( decomposition_xml_json) decomposition_json = decompositionsxml.transform_node_xml_json_to_json(decomposition_xml_json) decomposition_json_by_file_path_cache[xml_file_name] = decomposition_json return decomposition_json def get_cached_input_variables_and_parameters(column): """This function uses input_variables_extractor and expects the caller to check it is not None.""" assert input_variables_extractor is not None global input_variables_and_parameters_by_column_name_cache input_variables_and_parameters = input_variables_and_parameters_by_column_name_cache.get(column.name) if input_variables_and_parameters is None: input_variables_and_parameters = input_variables_extractor.get_input_variables_and_parameters(column) input_variables_and_parameters_by_column_name_cache[column.name] = input_variables_and_parameters return input_variables_and_parameters PK'G-s#s# openfisca_web_api/environment.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Environment configuration""" import collections import datetime import importlib import logging import os import pkg_resources import subprocess import sys import weakref from biryani import strings from openfisca_core import periods, reforms try: from openfisca_parsers import input_variables_extractors except ImportError: input_variables_extractors = None import openfisca_web_api from . import conf, conv, model app_dir = os.path.dirname(os.path.abspath(__file__)) country_package_git_head_sha = None git_head_sha = None class ValueAndError(list): # Can't be a tuple subclass, because WeakValueDictionary doesn't work with (sub)tuples. pass def get_git_head_sha(cwd = os.path.dirname(__file__)): output = subprocess.check_output(['git', 'rev-parse', '--verify', 'HEAD'], cwd=cwd) return output.rstrip('\n') def get_parameters_file_path(): country_package_dir_path = pkg_resources.get_distribution(conf['country_package']).location parameters_file_absolute_path = model.tax_benefit_system.legislation_xml_file_path parameters_file_path = parameters_file_absolute_path[len(country_package_dir_path):] if parameters_file_path.startswith('/'): parameters_file_path = parameters_file_path[1:] return parameters_file_path def load_environment(global_conf, app_conf): """Configure the application environment.""" conf = openfisca_web_api.conf # Empty dictionary conf.update(strings.deep_decode(global_conf)) conf.update(strings.deep_decode(app_conf)) conf.update(conv.check(conv.struct( { 'app_conf': conv.set_value(app_conf), 'app_dir': conv.set_value(app_dir), 'country_package': conv.pipe( conv.make_input_to_slug(separator = u'_'), conv.test_in(( u'openfisca_france', u'openfisca_tunisia', u'openfisca_tunisia_pension', )), conv.not_none, ), 'debug': conv.pipe(conv.guess_bool, conv.default(False)), 'global_conf': conv.set_value(global_conf), 'i18n_dir': conv.default(os.path.join(app_dir, 'i18n')), 'load_alert': conv.pipe(conv.guess_bool, conv.default(False)), 'log_level': conv.pipe( conv.default('WARNING'), conv.function(lambda log_level: getattr(logging, log_level.upper())), ), 'package_name': conv.default('openfisca-web-api'), 'realm': conv.default(u'OpenFisca Web API'), 'reforms': conv.ini_str_to_list, # Another validation is done below. }, default = 'drop', ))(conf)) # Configure logging. logging.basicConfig(level = conf['log_level'], stream = sys.stderr) errorware = conf.setdefault('errorware', {}) errorware['debug'] = conf['debug'] if not errorware['debug']: errorware['error_email'] = conf['email_to'] errorware['error_log'] = conf.get('error_log', None) errorware['error_message'] = conf.get('error_message', 'An internal server error occurred') errorware['error_subject_prefix'] = conf.get('error_subject_prefix', 'OpenFisca Web API Error: ') errorware['from_address'] = conf['from_address'] errorware['smtp_server'] = conf.get('smtp_server', 'localhost') # Initialize tax-benefit system. country_package = importlib.import_module(conf['country_package']) CountryTaxBenefitSystem = country_package.init_country() class Scenario(CountryTaxBenefitSystem.Scenario): instance_and_error_couple_cache = {} if conf['debug'] else weakref.WeakValueDictionary() # class attribute @classmethod def make_json_to_cached_or_new_instance(cls, ctx, repair, tax_benefit_system): def json_to_cached_or_new_instance(value, state = None): key = (unicode(ctx.lang), unicode(value), repair, tax_benefit_system) instance_and_error_couple = cls.instance_and_error_couple_cache.get(key) if instance_and_error_couple is None: instance_and_error_couple = cls.make_json_to_instance(repair, tax_benefit_system)( value, state = state or conv.default_state) # Note: Call to ValueAndError() is needed below, otherwise it raises TypeError: cannot create # weak reference to 'tuple' object. cls.instance_and_error_couple_cache[key] = ValueAndError(instance_and_error_couple) return instance_and_error_couple return json_to_cached_or_new_instance class TaxBenefitSystem(CountryTaxBenefitSystem): pass TaxBenefitSystem.Scenario = Scenario model.TaxBenefitSystem = TaxBenefitSystem model.tax_benefit_system = tax_benefit_system = TaxBenefitSystem() tax_benefit_system.prefill_cache() # Initialize reforms build_reform_functions = conv.check( conv.uniform_sequence( conv.module_and_function_str_to_function, ) )(conf['reforms']) if build_reform_functions is not None: api_reforms = [ build_reform(tax_benefit_system) for build_reform in build_reform_functions ] api_reforms = conv.check( conv.uniform_sequence(conv.test_isinstance(reforms.AbstractReform)) )(api_reforms) model.build_reform_function_by_key = { reform.key: build_reform for build_reform, reform in zip(build_reform_functions, api_reforms) } model.reform_by_full_key = { reform.full_key: reform for reform in api_reforms } # Cache default decomposition. model.get_cached_or_new_decomposition_json(tax_benefit_system) # Compute and cache compact legislation for each first day of month since at least 2 legal years. today = periods.instant(datetime.date.today()) first_day_of_year = today.offset('first-of', 'year') instant = first_day_of_year.offset(-2, 'year') two_years_later = first_day_of_year.offset(2, 'year') while instant < two_years_later: tax_benefit_system.get_compact_legislation(instant) instant = instant.offset(1, 'month') # Initialize lib2to3-based input variables extractor. if input_variables_extractors is not None: model.input_variables_extractor = input_variables_extractors.setup(tax_benefit_system) # Store Git last commit SHA global git_head_sha git_head_sha = get_git_head_sha() global country_package_git_head_sha country_package_git_head_sha = get_git_head_sha(cwd = country_package.__path__[0]) # Store parameters_file_path cache model.parameters_file_path = get_parameters_file_path() # Store parameters_json cache parameters_json = [] walk_legislation_json( tax_benefit_system.legislation_json, descriptions = [], parameters_json = parameters_json, path_fragments = [], ) model.parameters_json_cache = parameters_json def walk_legislation_json(node_json, descriptions, parameters_json, path_fragments): children_json = node_json.get('children') or None if children_json is None: parameter_json = node_json.copy() # No need to deepcopy since it is a leaf. description = u' ; '.join( fragment for fragment in descriptions + [node_json.get('description')] if fragment ) parameter_json['description'] = description parameter_json['name'] = u'.'.join(path_fragments) parameter_json = collections.OrderedDict(sorted(parameter_json.iteritems())) parameters_json.append(parameter_json) else: for child_name, child_json in children_json.iteritems(): walk_legislation_json( child_json, descriptions = descriptions + [node_json.get('description')], parameters_json = parameters_json, path_fragments = path_fragments + [child_name], ) PK'G 2 2)openfisca_web_api/controllers/simulate.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Simulate controller""" from __future__ import division import collections import copy import multiprocessing import os from openfisca_core import decompositions from .. import conf, contexts, conv, model, wsgihelpers cpu_count = multiprocessing.cpu_count() def N_(message): return message @wsgihelpers.wsgify def api1_simulate(req): ctx = contexts.Ctx(req) headers = wsgihelpers.handle_cross_origin_resource_sharing(ctx) assert req.method == 'POST', req.method if conf['load_alert']: try: load_average = os.getloadavg() except (AttributeError, OSError): # When load average is not available, always accept request. pass else: if load_average[0] / cpu_count > 1: return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, error = collections.OrderedDict(sorted(dict( code = 503, # Service Unavailable message = ctx._(u'Server is overloaded: {} {} {}').format(*load_average), ).iteritems())), method = req.script_name, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) content_type = req.content_type if content_type is not None: content_type = content_type.split(';', 1)[0].strip() if content_type != 'application/json': return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, error = collections.OrderedDict(sorted(dict( code = 400, # Bad Request message = ctx._(u'Bad content-type: {}').format(content_type), ).iteritems())), method = req.script_name, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) inputs, error = conv.pipe( conv.make_input_to_json(object_pairs_hook = collections.OrderedDict), conv.test_isinstance(dict), conv.not_none, )(req.body, state = ctx) if error is not None: return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, error = collections.OrderedDict(sorted(dict( code = 400, # Bad Request errors = [conv.jsonify_value(error)], message = ctx._(u'Invalid JSON in request POST body'), ).iteritems())), method = req.script_name, params = req.body, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) str_to_reforms = conv.make_str_to_reforms() data, errors = conv.struct( dict( # api_key = conv.pipe( # Shared secret between client and server # conv.test_isinstance(basestring), # conv.input_to_uuid_str, # conv.not_none, # ), base_reforms = str_to_reforms, context = conv.test_isinstance(basestring), # For asynchronous calls reforms = str_to_reforms, scenarios = conv.pipe( conv.test_isinstance(list), conv.uniform_sequence( conv.not_none, # Real conversion is done once tax-benefit system is known. ), conv.test(lambda scenarios: len(scenarios) >= 1, error = N_(u'At least one scenario is required')), conv.test(lambda scenarios: len(scenarios) <= 100, error = N_(u"There can't be more than 100 scenarios")), conv.not_none, ), trace = conv.pipe( conv.test_isinstance((bool, int)), conv.anything_to_bool, conv.default(False), ), validate = conv.pipe( conv.test_isinstance((bool, int)), conv.anything_to_bool, conv.default(False), ), ), )(inputs, state = ctx) if errors is None: country_tax_benefit_system = model.tax_benefit_system base_tax_benefit_system = model.get_cached_composed_reform( reform_keys = data['base_reforms'], tax_benefit_system = country_tax_benefit_system, ) if data['base_reforms'] is not None else country_tax_benefit_system if data['reforms'] is not None: reform_tax_benefit_system = model.get_cached_composed_reform( reform_keys = data['reforms'], tax_benefit_system = base_tax_benefit_system, ) base_scenarios, base_scenarios_errors = conv.uniform_sequence( base_tax_benefit_system.Scenario.make_json_to_cached_or_new_instance( ctx = ctx, repair = data['validate'], tax_benefit_system = base_tax_benefit_system, ) )(data['scenarios'], state = ctx) errors = {'scenarios': base_scenarios_errors} if base_scenarios_errors is not None else None if errors is None and data['reforms'] is not None: reform_scenarios, reform_scenarios_errors = conv.uniform_sequence( reform_tax_benefit_system.Scenario.make_json_to_cached_or_new_instance( ctx = ctx, repair = data['validate'], tax_benefit_system = reform_tax_benefit_system, ) )(data['scenarios'], state = ctx) errors = {'scenarios': reform_scenarios_errors} if reform_scenarios_errors is not None else None if errors is not None: return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, context = inputs.get('context'), error = collections.OrderedDict(sorted(dict( code = 400, # Bad Request errors = [conv.jsonify_value(errors)], message = ctx._(u'Bad parameters in request'), ).iteritems())), method = req.script_name, params = inputs, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) # api_key = data['api_key'] # account = model.Account.find_one( # dict( # api_key = api_key, # ), # as_class = collections.OrderedDict, # ) # if account is None: # return wsgihelpers.respond_json(ctx, # collections.OrderedDict(sorted(dict( # apiVersion = 1, # context = data['context'], # error = collections.OrderedDict(sorted(dict( # code = 401, # Unauthorized # message = ctx._('Unknown API Key: {}').format(api_key), # ).iteritems())), # method = req.script_name, # params = inputs, # url = req.url.decode('utf-8'), # ).iteritems())), # headers = headers, # ) scenarios = base_scenarios if data['reforms'] is None else reform_scenarios suggestions = {} for scenario_index, scenario in enumerate(scenarios): if data['validate']: original_test_case = scenario.test_case scenario.test_case = copy.deepcopy(original_test_case) suggestion = scenario.suggest() # This modifies scenario.test_case! if data['validate']: scenario.test_case = original_test_case if suggestion is not None: suggestions.setdefault('scenarios', {})[scenario_index] = suggestion if not suggestions: suggestions = None if data['validate']: # Only a validation is requested. Don't launch simulation return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, context = inputs.get('context'), method = req.script_name, params = inputs, repaired_scenarios = [ scenario.to_json() for scenario in scenarios ], suggestions = suggestions, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) decomposition_json = model.get_cached_or_new_decomposition_json(tax_benefit_system = base_tax_benefit_system) base_simulations = [scenario.new_simulation(trace = data['trace']) for scenario in base_scenarios] base_response_json = decompositions.calculate(base_simulations, decomposition_json) if data['reforms'] is not None: reform_decomposition_json = model.get_cached_or_new_decomposition_json( tax_benefit_system = reform_tax_benefit_system, ) reform_simulations = [scenario.new_simulation(trace = data['trace']) for scenario in reform_scenarios] reform_response_json = decompositions.calculate(reform_simulations, reform_decomposition_json) if data['trace']: simulations_variables_json = [] tracebacks_json = [] simulations = reform_simulations if data['reforms'] is not None else base_simulations for simulation in simulations: simulation_variables_json = {} traceback_json = [] for (variable_name, period), step in simulation.traceback.iteritems(): holder = step['holder'] if variable_name not in simulation_variables_json: variable_value_json = holder.to_value_json() if variable_value_json is not None: simulation_variables_json[variable_name] = variable_value_json input_variables_infos = step.get('input_variables_infos') column = holder.column traceback_json.append(dict( cell_type = column.val_type, # Unification with OpenFisca Julia name. default_input_variables = step.get('default_input_variables', False), entity = column.entity, input_variables = [ (variable_name, str(variable_period)) for variable_name, variable_period in input_variables_infos ] if input_variables_infos else None, is_computed = step.get('is_computed', False), label = column.label if column.label != variable_name else None, name = variable_name, period = str(period) if period is not None else None, )) simulations_variables_json.append(simulation_variables_json) tracebacks_json.append(traceback_json) else: simulations_variables_json = None tracebacks_json = None response_data = dict( apiVersion = 1, context = data['context'], method = req.script_name, params = inputs, suggestions = suggestions, tracebacks = tracebacks_json, url = req.url.decode('utf-8'), value = reform_response_json if data['reforms'] is not None else base_response_json, variables = simulations_variables_json, ) if data['reforms'] is not None: response_data['base_value'] = base_response_json return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(response_data.iteritems())), headers = headers, ) PK'GtCww+openfisca_web_api/controllers/parameters.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Parameters controller""" import collections from openfisca_core import legislations, periods from .. import contexts, conv, environment, model, wsgihelpers @wsgihelpers.wsgify def api1_parameters(req): ctx = contexts.Ctx(req) headers = wsgihelpers.handle_cross_origin_resource_sharing(ctx) assert req.method == 'GET', req.method params = req.GET inputs = dict( instant = params.get('instant'), names = params.getall('name'), ) parameters_name = [ parameter_json['name'] for parameter_json in model.parameters_json_cache ] data, errors = conv.pipe( conv.struct( dict( instant = conv.pipe( conv.empty_to_none, conv.test_isinstance(basestring), conv.function(lambda str: periods.instant(str)), ), names = conv.pipe( conv.uniform_sequence( conv.pipe( conv.empty_to_none, conv.test_in(parameters_name, error = u'Parameter does not exist'), ), drop_none_items = True, ), conv.empty_to_none, ), ), default = 'drop', ), )(inputs, state = ctx) if errors is not None: return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, error = collections.OrderedDict(sorted(dict( code = 400, # Bad Request errors = [conv.jsonify_value(errors)], message = ctx._(u'Bad parameters in request'), ).iteritems())), method = req.script_name, params = inputs, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) tax_benefit_system = model.tax_benefit_system if data['instant'] is None: if data['names'] is None: parameters_json = model.parameters_json_cache else: parameters_json = [ parameter_json for parameter_json in model.parameters_json_cache if parameter_json['name'] in data['names'] ] else: instant = data['instant'] parameters_json = [] dated_legislation_json = legislations.generate_dated_legislation_json( tax_benefit_system.legislation_json, instant, ) for name in data['names']: name_fragments = name.split('.') parameter_json = dated_legislation_json for name_fragment in name_fragments: parameter_json = parameter_json['children'][name_fragment] parameter_json['name'] = name parameter_json_in_cache = [ parameter_json1 for parameter_json1 in model.parameters_json_cache if parameter_json1['name'] == name ][0] parameter_json['description'] = parameter_json_in_cache['description'] parameters_json.append(parameter_json) return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, country_package_git_head_sha = environment.country_package_git_head_sha, currency = tax_benefit_system.CURRENCY, method = req.script_name, parameters = parameters_json, parameters_file_path = model.parameters_file_path, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) PK'Geĭ(openfisca_web_api/controllers/swagger.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Swagger controller""" import datetime import pkg_resources from . import formula from .formula import default_period from .. import contexts, model, wsgihelpers PACKAGE_VERSION = pkg_resources.get_distribution('OpenFisca-Web-API').version SWAGGER_BASE_PATH = '/api/2/formula' @wsgihelpers.wsgify def api1_swagger(req): ctx = contexts.Ctx(req) headers = wsgihelpers.handle_cross_origin_resource_sharing(ctx) return wsgihelpers.respond_json(ctx, build_json(), headers = headers) def build_json(): result = build_metadata() result['paths'] = build_paths() return result def build_metadata(): return { 'swagger': '2.0', 'basePath': SWAGGER_BASE_PATH, 'info': { 'version': PACKAGE_VERSION, 'title': 'OpenFisca', 'description': formula.api2_formula.__doc__, 'termsOfService': 'http://github.com/openfisca/openfisca-web-api', 'contact': { 'name': 'OpenFisca team', 'email': 'contact@openfisca.fr', 'url': 'http://github.com/openfisca/openfisca-web-api/issues' }, 'license': { 'name': 'AGPL', 'url': 'https://www.gnu.org/licenses/agpl-3.0.html' } }, 'parameters': { 'periodParam': { 'name': 'period', 'in': 'path', 'required': True, # Swagger spec: in path => required; pattern thus allows empty values 'default': default_period(), 'description': 'The period for which the given taxes are to be computed', 'type': 'string', 'format': 'period', 'pattern': '^([12]\d{3}(\-\d{2}){0,2})?$' } } } def build_paths(): return { '/{period}/' + name: { 'get': map_path_to_swagger(column) } for name, column in model.tax_benefit_system.column_by_name.iteritems() if not column.is_input_variable() } def map_path_to_swagger(column): column_json = column.to_json() result = map_path_base_to_swagger(column_json) result['responses'] = make_responses_for(column_json) try: result['parameters'] = get_parameters(column) except Exception, e: print('Error mapping parameters of formula "{}":'.format(column.to_json().get('name'))) # noqa print(e) # noqa return result def map_path_base_to_swagger(column_json): result = { 'summary': column_json.get('label'), 'tags': [column_json.get('entity')], } if column_json.get('url'): result['externalDocs'] = {'url': column_json.get('url')} return result def make_responses_for(column_json): return { 200: { 'description': column_json.get('label'), 'schema': make_response_schema_for(column_json) }, 400: { 'description': 'At least one of the sent parameters could not be parsed.', 'schema': { 'type': 'object', 'required': [ 'params', 'error', 'apiVersion' ], 'properties': { 'params': { 'type': 'object' }, 'error': { 'type': 'object', 'required': [ 'message' ], 'properties': { 'message': { 'type': 'string' } } }, 'apiVersion': { 'type': 'string', 'format': 'semver' } } } } } def make_response_schema_for(column_json): return { "type": "object", "required": [ "apiVersion", "values", "params", "period" ], "properties": { "apiVersion": { "type": "string", "format": "semver" }, "values": { "type": "object", "required": [ column_json.get('name') ], "properties": { column_json.get('name'): map_type_to_swagger(column_json.get('@type', 'string')) } }, "params": { "type": "object" }, "period": { "type": "array" } } } def get_parameters(column): result = map_parameters_to_swagger(column) result.append({ '$ref': '#/parameters/periodParam' }) return result def map_parameters_to_swagger(column): input_variables, parameters = model.get_cached_input_variables_and_parameters(column) return [ map_parameter_to_swagger(model.tax_benefit_system.column_by_name[variable_name]) for variable_name in input_variables ] def map_parameter_to_swagger(column): column_json = column.to_json() result = map_type_to_swagger(column_json.get('@type')) if column_json.get('labels'): result['enum'] = column_json.get('labels').values() result.update({ 'name': column_json.get('name'), 'description': column_json.get('label'), 'default': get_default_value(column, column_json), 'in': 'query' }) return result def get_default_value(column, column_json = None): result = column.default if isinstance(result, datetime.date): result = '%s-%s-%s' % (result.year, result.month, result.day) elif column_json.get('labels'): # the default value is actually the key to the array of allowed values if column_json is None: column_json = column.to_json() result = column_json.get('labels').get(result) return result # Transforms a Python type to a Swagger type def map_type_to_swagger(type): result = {'type': type.lower()} if type == 'Integer': result['format'] = 'int32' elif type == 'Float': result['type'] = 'number' result['format'] = 'float' elif type == 'Date': result['type'] = 'string' result['format'] = 'date' elif type == 'Enumeration': result['type'] = 'string' return result PK'G:W< < )openfisca_web_api/controllers/__init__.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Controllers""" import collections from . import calculate, entities, field, fields, formula, graph, parameters, reforms, simulate, swagger, variables from .. import contexts, urls, wsgihelpers router = None @wsgihelpers.wsgify def index(req): ctx = contexts.Ctx(req) headers = wsgihelpers.handle_cross_origin_resource_sharing(ctx) return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, message = u'Welcome, this is OpenFisca Web API.', method = req.script_name, ).iteritems())), headers = headers, ) def make_router(): """Return a WSGI application that searches requests to controllers """ global router routings = [ ('GET', '^/$', index), ('GET', '^/api/?$', index), ('POST', '^/api/1/calculate/?$', calculate.api1_calculate), ('GET', '^/api/1/entities/?$', entities.api1_entities), ('GET', '^/api/1/field/?$', field.api1_field), ('GET', '^/api/1/fields/?$', fields.api1_fields), ('GET', '^/api/1/formula/(?P[^/]+)/?$', formula.api1_formula), ('GET', '^/api/2/formula/(?:(?P[A-Za-z0-9:-]*)/)?(?P[A-Za-z0-9_+-]+)/?$', formula.api2_formula), ('GET', '^/api/1/graph/?$', graph.api1_graph), ('GET', '^/api/1/parameters/?$', parameters.api1_parameters), ('GET', '^/api/1/reforms/?$', reforms.api1_reforms), ('POST', '^/api/1/simulate/?$', simulate.api1_simulate), ('GET', '^/api/1/swagger$', swagger.api1_swagger), ('GET', '^/api/1/variables/?$', variables.api1_variables), ] router = urls.make_router(*routings) return router PK'GXp p *openfisca_web_api/controllers/variables.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Variables controller""" import collections import datetime from openfisca_core import periods, simulations from .. import contexts, conv, environment, model, wsgihelpers @wsgihelpers.wsgify def api1_variables(req): ctx = contexts.Ctx(req) headers = wsgihelpers.handle_cross_origin_resource_sharing(ctx) assert req.method == 'GET', req.method params = req.GET inputs = dict( names = params.getall('name'), ) tax_benefit_system = model.tax_benefit_system tax_benefit_system_variables_name = tax_benefit_system.column_by_name.keys() data, errors = conv.pipe( conv.struct( dict( names = conv.pipe( conv.uniform_sequence( conv.pipe( conv.empty_to_none, conv.test_in(tax_benefit_system_variables_name, error = u'Variable does not exist'), ), drop_none_items = True, ), conv.empty_to_none, ), ), default = 'drop', ), )(inputs, state = ctx) simulation = None variables_json = [] for variable_name in data['names'] or tax_benefit_system_variables_name: column = tax_benefit_system.column_by_name[variable_name] variable_json = column.to_json() label = variable_json.get('label') if label is not None and label == variable_name: del variable_json['label'] if not column.is_input_variable(): if simulation is None: simulation = simulations.Simulation( period = periods.period(datetime.date.today().year), tax_benefit_system = model.tax_benefit_system, ) holder = simulation.get_or_new_holder(variable_name) variable_json['formula'] = holder.formula.to_json( get_input_variables_and_parameters = model.get_cached_input_variables_and_parameters, ) variables_json.append(variable_json) return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, country_package_git_head_sha = environment.country_package_git_head_sha, method = req.script_name, url = req.url.decode('utf-8'), variables = variables_json, ).iteritems())), headers = headers, ) PK'Gû`/fLfL*openfisca_web_api/controllers/calculate.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Calculate controller""" from __future__ import division import collections import copy import itertools import multiprocessing import os import time from .. import conf, contexts, conv, model, wsgihelpers cpu_count = multiprocessing.cpu_count() def N_(message): return message def build_and_calculate_simulations(variables_name, scenarios, trace = False): simulations = [] for scenario in scenarios: simulation = scenario.new_simulation(trace = trace) for variable_name in variables_name: simulation.calculate_output(variable_name) simulations.append(simulation) return simulations def build_output_variables(simulations, use_label, variables): return [ { variable: simulation.get_holder(variable).to_value_json(use_label = use_label) for variable in variables } for simulation in simulations ] def fill_test_cases_with_values(intermediate_variables, scenarios, simulations, use_label, variables): output_test_cases = [] for scenario, simulation in itertools.izip(scenarios, simulations): if intermediate_variables: holders = [] for step in simulation.traceback.itervalues(): holder = step['holder'] if holder not in holders: holders.append(holder) else: holders = [ simulation.get_holder(variable) for variable in variables ] test_case = scenario.to_json()['test_case'] for holder in holders: variable_value_json = holder.to_value_json(use_label = use_label) if variable_value_json is None: continue variable_name = holder.column.name entity_members = test_case[holder.entity.key_plural] if isinstance(variable_value_json, dict): for entity_member_index, entity_member in enumerate(entity_members): entity_member[variable_name] = { period: array_json[entity_member_index] for period, array_json in variable_value_json.iteritems() } else: for entity_member, cell_json in itertools.izip(entity_members, variable_value_json): entity_member[variable_name] = cell_json output_test_cases.append(test_case) return output_test_cases @wsgihelpers.wsgify def api1_calculate(req): total_start_time = time.time() ctx = contexts.Ctx(req) headers = wsgihelpers.handle_cross_origin_resource_sharing(ctx) assert req.method == 'POST', req.method if conf['load_alert']: try: load_average = os.getloadavg() except (AttributeError, OSError): # When load average is not available, always accept request. pass else: if load_average[0] / cpu_count > 1: return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, error = collections.OrderedDict(sorted(dict( code = 503, # Service Unavailable message = ctx._(u'Server is overloaded: {} {} {}').format(*load_average), ).iteritems())), method = req.script_name, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) content_type = req.content_type if content_type is not None: content_type = content_type.split(';', 1)[0].strip() if content_type != 'application/json': return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, error = collections.OrderedDict(sorted(dict( code = 400, # Bad Request message = ctx._(u'Bad content-type: {}').format(content_type), ).iteritems())), method = req.script_name, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) inputs, error = conv.pipe( conv.make_input_to_json(object_pairs_hook = collections.OrderedDict), conv.test_isinstance(dict), conv.not_none, )(req.body, state = ctx) if error is not None: return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, error = collections.OrderedDict(sorted(dict( code = 400, # Bad Request errors = [conv.jsonify_value(error)], message = ctx._(u'Invalid JSON in request POST body'), ).iteritems())), method = req.script_name, params = req.body, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) str_to_reforms = conv.make_str_to_reforms() data, errors = conv.struct( dict( # api_key = conv.pipe( # Shared secret between client and server # conv.test_isinstance(basestring), # conv.input_to_uuid_str, # conv.not_none, # ), base_reforms = str_to_reforms, context = conv.test_isinstance(basestring), # For asynchronous calls intermediate_variables = conv.pipe( conv.test_isinstance((bool, int)), conv.anything_to_bool, conv.default(False), ), labels = conv.pipe( # Return labels (of enumerations) instead of numeric values. conv.test_isinstance((bool, int)), conv.anything_to_bool, conv.default(False), ), output_format = conv.pipe( conv.test_isinstance(basestring), conv.test_in(['test_case', 'variables']), conv.default('test_case'), ), reforms = str_to_reforms, scenarios = conv.pipe( conv.test_isinstance(list), conv.uniform_sequence( conv.not_none, # Real conversion is done once tax-benefit system is known. ), conv.test(lambda scenarios: len(scenarios) >= 1, error = N_(u'At least one scenario is required')), conv.test(lambda scenarios: len(scenarios) <= 100, error = N_(u"There can't be more than 100 scenarios")), conv.not_none, ), time = conv.pipe( conv.test_isinstance((bool, int)), conv.anything_to_bool, conv.default(False), ), trace = conv.pipe( conv.test_isinstance((bool, int)), conv.anything_to_bool, conv.default(False), ), validate = conv.pipe( conv.test_isinstance((bool, int)), conv.anything_to_bool, conv.default(False), ), variables = conv.pipe( conv.test_isinstance(list), conv.uniform_sequence( conv.pipe( conv.test_isinstance(basestring), conv.empty_to_none, # Remaining of conversion is done once tax-benefit system is known. conv.not_none, ), constructor = set, ), conv.test(lambda variables: len(variables) >= 1, error = N_(u'At least one variable is required')), conv.not_none, ), ), )(inputs, state = ctx) if errors is None: compose_reforms_start_time = time.time() country_tax_benefit_system = model.tax_benefit_system base_tax_benefit_system = model.get_cached_composed_reform( reform_keys = data['base_reforms'], tax_benefit_system = country_tax_benefit_system, ) if data['base_reforms'] is not None else country_tax_benefit_system if data['reforms'] is not None: reform_tax_benefit_system = model.get_cached_composed_reform( reform_keys = data['reforms'], tax_benefit_system = base_tax_benefit_system, ) compose_reforms_end_time = time.time() compose_reforms_time = compose_reforms_end_time - compose_reforms_start_time build_scenarios_start_time = time.time() base_scenarios, base_scenarios_errors = conv.uniform_sequence( base_tax_benefit_system.Scenario.make_json_to_cached_or_new_instance( ctx = ctx, repair = data['validate'], tax_benefit_system = base_tax_benefit_system, ) )(data['scenarios'], state = ctx) errors = {'scenarios': base_scenarios_errors} if base_scenarios_errors is not None else None if errors is None and data['reforms'] is not None: reform_scenarios, reform_scenarios_errors = conv.uniform_sequence( reform_tax_benefit_system.Scenario.make_json_to_cached_or_new_instance( ctx = ctx, repair = data['validate'], tax_benefit_system = reform_tax_benefit_system, ) )(data['scenarios'], state = ctx) errors = {'scenarios': reform_scenarios_errors} if reform_scenarios_errors is not None else None build_scenarios_end_time = time.time() build_scenarios_time = build_scenarios_end_time - build_scenarios_start_time if errors is None: data, errors = conv.struct( dict( variables = conv.uniform_sequence( conv.make_validate_variable( base_tax_benefit_system = base_tax_benefit_system, reform_tax_benefit_system = reform_tax_benefit_system if data['reforms'] else None, reforms = data['reforms'], ), ), ), default = conv.noop, )(data, state = ctx) if errors is not None: return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, context = inputs.get('context'), error = collections.OrderedDict(sorted(dict( code = 400, # Bad Request errors = [conv.jsonify_value(errors)], message = ctx._(u'Bad parameters in request'), ).iteritems())), method = req.script_name, params = inputs, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) # api_key = data['api_key'] # account = model.Account.find_one( # dict( # api_key = api_key, # ), # as_class = collections.OrderedDict, # ) # if account is None: # return wsgihelpers.respond_json(ctx, # collections.OrderedDict(sorted(dict( # apiVersion = 1, # context = data['context'], # error = collections.OrderedDict(sorted(dict( # code = 401, # Unauthorized # message = ctx._('Unknown API Key: {}').format(api_key), # ).iteritems())), # method = req.script_name, # params = inputs, # url = req.url.decode('utf-8'), # ).iteritems())), # headers = headers, # ) scenarios = base_scenarios if data['reforms'] is None else reform_scenarios suggestions = {} for scenario_index, scenario in enumerate(scenarios): if data['validate']: original_test_case = scenario.test_case scenario.test_case = copy.deepcopy(original_test_case) suggestion = scenario.suggest() # This modifies scenario.test_case! if data['validate']: scenario.test_case = original_test_case if suggestion is not None: suggestions.setdefault('scenarios', {})[scenario_index] = suggestion if not suggestions: suggestions = None if data['validate']: # Only a validation is requested. Don't launch simulation total_end_time = time.time() total_time = total_end_time - total_start_time response_data = dict( apiVersion = 1, context = inputs.get('context'), method = req.script_name, params = inputs, repaired_scenarios = [ scenario.to_json() for scenario in scenarios ], suggestions = suggestions, url = req.url.decode('utf-8'), ) if data['time']: response_data['time'] = collections.OrderedDict(sorted(dict( build_scenarios = build_scenarios_time, compose_reforms = compose_reforms_time, total = total_time, ).iteritems())), return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(response_data.iteritems())), headers = headers, ) calculate_simulation_start_time = time.time() base_simulations = build_and_calculate_simulations( scenarios = base_scenarios, trace = data['intermediate_variables'] or data['trace'], variables_name = data['variables'], ) if data['reforms'] is not None: reform_simulations = build_and_calculate_simulations( scenarios = reform_scenarios, trace = data['intermediate_variables'] or data['trace'], variables_name = data['variables'], ) calculate_simulation_end_time = time.time() calculate_simulation_time = calculate_simulation_end_time - calculate_simulation_start_time if data['output_format'] == 'test_case': base_value = fill_test_cases_with_values( intermediate_variables = data['intermediate_variables'], scenarios = base_scenarios, simulations = base_simulations, use_label = data['labels'], variables = data['variables'], ) if data['reforms'] is not None: reform_value = fill_test_cases_with_values( intermediate_variables = data['intermediate_variables'], scenarios = reform_scenarios, simulations = reform_simulations, use_label = data['labels'], variables = data['variables'], ) else: assert data['output_format'] == 'variables' base_value = build_output_variables( simulations = base_simulations, use_label = data['labels'], variables = data['variables'], ) if data['reforms'] is not None: reform_value = build_output_variables( simulations = reform_simulations, use_label = data['labels'], variables = data['variables'], ) if data['trace']: simulations_variables_json = [] tracebacks_json = [] simulations = reform_simulations if data['reforms'] is not None else base_simulations for simulation in simulations: simulation_variables_json = {} traceback_json = [] for (variable_name, period), step in simulation.traceback.iteritems(): holder = step['holder'] if variable_name not in simulation_variables_json: variable_value_json = holder.to_value_json() if variable_value_json is not None: simulation_variables_json[variable_name] = variable_value_json column = holder.column input_variables_infos = step.get('input_variables_infos') parameters_infos = step.get('parameters_infos') traceback_json.append(collections.OrderedDict(sorted(dict( cell_type = column.val_type, # Unification with OpenFisca Julia name. default_input_variables = step.get('default_input_variables', False), entity = column.entity, input_variables = [ (input_variable_name, str(input_variable_period)) for input_variable_name, input_variable_period in input_variables_infos ] if input_variables_infos else None, is_computed = step.get('is_computed', False), label = column.label if column.label != variable_name else None, name = variable_name, parameters = parameters_infos or None, period = str(period) if period is not None else None, ).iteritems()))) simulations_variables_json.append(simulation_variables_json) tracebacks_json.append(traceback_json) else: simulations_variables_json = None tracebacks_json = None response_data = collections.OrderedDict(sorted(dict( apiVersion = 1, context = data['context'], method = req.script_name, params = inputs, suggestions = suggestions, tracebacks = tracebacks_json, url = req.url.decode('utf-8'), value = reform_value if data['reforms'] is not None else base_value, variables = simulations_variables_json, ).iteritems())) if data['reforms'] is not None: response_data['base_value'] = base_value total_end_time = time.time() total_time = total_end_time - total_start_time if data['time']: response_data['time'] = collections.OrderedDict(sorted(dict( build_scenarios = build_scenarios_time, compose_reforms = compose_reforms_time, calculate_simulation = calculate_simulation_time, total = total_time, ).iteritems())) return wsgihelpers.respond_json(ctx, response_data, headers = headers) PK'G0'openfisca_web_api/controllers/fields.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Fields controller""" import collections import copy from .. import contexts, conv, model, wsgihelpers def get_column_json(column): column_json = column.to_json() column_json['entity'] = column.entity # Overwrite with symbol instead of key plural for compatibility. return column_json @wsgihelpers.wsgify def api1_fields(req): ctx = contexts.Ctx(req) headers = wsgihelpers.handle_cross_origin_resource_sharing(ctx) assert req.method == 'GET', req.method params = req.GET inputs = dict( context = params.get('context'), ) data, errors = conv.pipe( conv.struct( dict( context = conv.noop, # For asynchronous calls ), default = 'drop', ), )(inputs, state = ctx) if errors is not None: return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, context = inputs.get('context'), error = collections.OrderedDict(sorted(dict( code = 400, # Bad Request errors = [conv.jsonify_value(errors)], message = ctx._(u'Bad parameters in request'), ).iteritems())), method = req.script_name, params = inputs, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) columns = collections.OrderedDict( (name, get_column_json(column)) for name, column in model.tax_benefit_system.column_by_name.iteritems() if name not in ('idfam', 'idfoy', 'idmen', 'noi', 'quifam', 'quifoy', 'quimen') if column.is_input_variable() ) columns_tree = collections.OrderedDict( ( dict( fam = 'familles', foy = 'foyers_fiscaux', ind = 'individus', men = 'menages', )[entity], copy.deepcopy(tree), ) for entity, tree in model.tax_benefit_system.columns_name_tree_by_entity.iteritems() ) prestations = collections.OrderedDict( (name, get_column_json(column)) for name, column in model.tax_benefit_system.column_by_name.iteritems() if not column.is_input_variable() ) return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiStatus = u'deprecated', apiVersion = 1, columns = columns, columns_tree = columns_tree, context = data['context'], method = req.script_name, params = inputs, prestations = prestations, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) PK'G5  &openfisca_web_api/controllers/field.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Field controller""" import collections import datetime from openfisca_core import periods, simulations from .. import contexts, conv, model, wsgihelpers @wsgihelpers.wsgify def api1_field(req): ctx = contexts.Ctx(req) headers = wsgihelpers.handle_cross_origin_resource_sharing(ctx) assert req.method == 'GET', req.method params = req.GET inputs = dict( context = params.get('context'), input_variables = params.get('input_variables'), reforms = params.getall('reform'), variable = params.get('variable'), ) str_to_reforms = conv.make_str_to_reforms() data, errors = conv.pipe( conv.struct( dict( context = conv.noop, # For asynchronous calls input_variables = conv.pipe( conv.test_isinstance((bool, int, basestring)), conv.anything_to_bool, conv.default(True), ), reforms = str_to_reforms, variable = conv.noop, # Real conversion is done once tax-benefit system is known. ), default = 'drop', ), )(inputs, state = ctx) if errors is None: country_tax_benefit_system = model.tax_benefit_system tax_benefit_system = model.get_cached_composed_reform( reform_keys = data['reforms'], tax_benefit_system = country_tax_benefit_system, ) if data['reforms'] is not None else country_tax_benefit_system data, errors = conv.struct( dict( variable = conv.pipe( conv.empty_to_none, conv.default(u'revdisp'), conv.test_in(tax_benefit_system.column_by_name), ), ), default = conv.noop, )(data, state = ctx) if errors is not None: return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, context = inputs.get('context'), error = collections.OrderedDict(sorted(dict( code = 400, # Bad Request errors = [conv.jsonify_value(errors)], message = ctx._(u'Bad parameters in request'), ).iteritems())), method = req.script_name, params = inputs, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) variable_name = data['variable'] column = tax_benefit_system.column_by_name[variable_name] value_json = column.to_json() if data['input_variables'] and not column.is_input_variable(): simulation = simulations.Simulation( period = periods.period(datetime.date.today().year), tax_benefit_system = tax_benefit_system, ) holder = simulation.get_or_new_holder(variable_name) value_json['formula'] = holder.formula.to_json( get_input_variables_and_parameters = model.get_cached_input_variables_and_parameters, with_input_variables_details = True, ) value_json['entity'] = column.entity # Overwrite with symbol instead of key plural for compatibility. return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiStatus = u'deprecated', apiVersion = 1, context = data['context'], method = req.script_name, params = inputs, url = req.url.decode('utf-8'), value = value_json, ).iteritems())), headers = headers, ) PK'G@7 (openfisca_web_api/controllers/reforms.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Reforms controller""" import collections from .. import contexts, conv, model, wsgihelpers @wsgihelpers.wsgify def api1_reforms(req): ctx = contexts.Ctx(req) headers = wsgihelpers.handle_cross_origin_resource_sharing(ctx) assert req.method == 'GET', req.method params = req.GET inputs = dict( context = params.get('context'), ) data, errors = conv.pipe( conv.struct( dict( context = conv.noop, # For asynchronous calls ), default = 'drop', ), )(inputs, state = ctx) if errors is not None: return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, context = inputs.get('context'), error = collections.OrderedDict(sorted(dict( code = 400, # Bad Request errors = [conv.jsonify_value(errors)], message = ctx._(u'Bad parameters in request'), ).iteritems())), method = req.script_name, params = inputs, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) build_reform_function_by_key = model.build_reform_function_by_key declared_reforms_key = build_reform_function_by_key.keys() \ if build_reform_function_by_key is not None \ else None reforms = collections.OrderedDict(sorted({ reform_key: reform.name for reform_key, reform in model.reform_by_full_key.iteritems() }.iteritems())) if declared_reforms_key is not None else None return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, context = data['context'], method = req.script_name, params = inputs, reforms = reforms, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) PK'G??)openfisca_web_api/controllers/entities.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Entities controller""" import collections from .. import contexts, conv, model, wsgihelpers def build_entity_data(entity_class): entity_data = {'label': entity_class.label} if entity_class.is_persons_entity: entity_data['isPersonsEntity'] = entity_class.is_persons_entity else: entity_data.update({ 'maxCardinalityByRoleKey': entity_class.max_cardinality_by_role_key, 'roles': entity_class.roles_key, 'labelByRoleKey': entity_class.label_by_role_key, }) return entity_data @wsgihelpers.wsgify def api1_entities(req): ctx = contexts.Ctx(req) headers = wsgihelpers.handle_cross_origin_resource_sharing(ctx) assert req.method == 'GET', req.method params = req.GET inputs = dict( context = params.get('context'), reforms = params.getall('reform'), ) str_to_reforms = conv.make_str_to_reforms() data, errors = conv.pipe( conv.struct( dict( context = conv.noop, # For asynchronous calls reforms = str_to_reforms, ), default = 'drop', ), )(inputs, state = ctx) if errors is not None: return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, context = inputs.get('context'), error = collections.OrderedDict(sorted(dict( code = 400, # Bad Request errors = [conv.jsonify_value(errors)], message = ctx._(u'Bad parameters in request'), ).iteritems())), method = req.script_name, params = inputs, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) country_tax_benefit_system = model.tax_benefit_system tax_benefit_system = model.get_cached_composed_reform( reform_keys = data['reforms'], tax_benefit_system = country_tax_benefit_system, ) if data['reforms'] is not None else country_tax_benefit_system entities_class = tax_benefit_system.entity_class_by_key_plural.itervalues() entities = { entity_class.key_plural: build_entity_data(entity_class) for entity_class in entities_class } return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, context = data['context'], entities = collections.OrderedDict(sorted(entities.iteritems())), method = req.script_name, params = inputs, ).iteritems())), headers = headers, ) PK'G#R<^&openfisca_web_api/controllers/graph.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Graph controller""" import collections import datetime from openfisca_core import periods, simulations from .. import contexts, conv, model, wsgihelpers @wsgihelpers.wsgify def api1_graph(req): ctx = contexts.Ctx(req) headers = wsgihelpers.handle_cross_origin_resource_sharing(ctx) assert req.method == 'GET', req.method params = req.GET inputs = dict( context = params.get('context'), reforms = params.getall('reform'), variable = params.get('variable'), ) if model.input_variables_extractor is None: return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, context = inputs.get('context'), error = collections.OrderedDict(sorted(dict( code = 501, errors = [ctx._(u'openfisca_parsers is not installed on this instance of the API')], message = ctx._(u'Not implemented'), ).iteritems())), method = req.script_name, params = inputs, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) str_to_reforms = conv.make_str_to_reforms() data, errors = conv.pipe( conv.struct( dict( context = conv.noop, # For asynchronous calls reforms = str_to_reforms, variable = conv.noop, # Real conversion is done once tax-benefit system is known. ), default = 'drop', ), )(inputs, state = ctx) if errors is None: country_tax_benefit_system = model.tax_benefit_system tax_benefit_system = model.get_cached_composed_reform( reform_keys = data['reforms'], tax_benefit_system = country_tax_benefit_system, ) if data['reforms'] is not None else country_tax_benefit_system data, errors = conv.struct( dict( variable = conv.pipe( conv.empty_to_none, conv.default(u'revdisp'), conv.test_in(tax_benefit_system.column_by_name), ), ), default = conv.noop, )(data, state = ctx) if errors is not None: return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, context = inputs.get('context'), error = collections.OrderedDict(sorted(dict( code = 400, # Bad Request errors = [conv.jsonify_value(errors)], message = ctx._(u'Bad parameters in request'), ).iteritems())), method = req.script_name, params = inputs, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) simulation = simulations.Simulation( period = periods.period(datetime.date.today().year), tax_benefit_system = tax_benefit_system, ) edges = [] nodes = [] visited = set() simulation.graph( column_name = data['variable'], edges = edges, get_input_variables_and_parameters = model.get_cached_input_variables_and_parameters, nodes = nodes, visited = visited, ) return wsgihelpers.respond_json(ctx, collections.OrderedDict(sorted(dict( apiVersion = 1, context = data['context'], edges = edges, method = req.script_name, nodes = nodes, params = inputs, url = req.url.decode('utf-8'), ).iteritems())), headers = headers, ) PK'Gtg% % (openfisca_web_api/controllers/formula.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Formula controller""" from datetime import datetime import numpy as np from openfisca_core import periods, simulations from .. import contexts, conv, model, wsgihelpers @wsgihelpers.wsgify def api1_formula(req): API_VERSION = 1 params = dict(req.GET) data = dict() try: period = parse_period(params.pop('period', None)) params = normalize(params) column = get_column_from_formula_name(req.urlvars.get('name')) data['value'] = compute(column.name, params, period) except Exception as error: data['error'] = error.args[0] finally: return respond(req, API_VERSION, data, params) @wsgihelpers.wsgify def api2_formula(req): """ A simple `GET`-, URL-based API to OpenFisca, making the assumption of computing formulas for a single person. Combination ----------- You can compute several formulas at once by combining the paths and joining them with `+`. Example: ``` /salsuperbrut+salaire_net_a_payer?salaire_de_base=1440 ``` This will compute both `salsuperbrut` and `salaire_net_a_payer` in a single request. URL size limit -------------- Using combination with a lot of parameters may lead to long URLs. If used within the browser, make sure the resulting URL is kept [under 2047 characters](http://stackoverflow.com/questions/417142) for cross-browser compatibility, by splitting combined requests. On a server, just test what your library handles. """ API_VERSION = '2.1.0' params = dict(req.GET) data = dict() try: params = normalize(params) formula_names = req.urlvars.get('names').split('+') data['values'] = dict() data['period'] = parse_period(req.urlvars.get('period')) for formula_name in formula_names: column = get_column_from_formula_name(formula_name) data['values'][formula_name] = compute(column.name, params, data['period']) except Exception as error: if isinstance(error.args[0], dict): # we raised it ourselves, in this controller error = error.args[0] else: error = dict( message = unicode(error), code = 500 ) data['error'] = error finally: return respond(req, API_VERSION, data, params) def get_column_from_formula_name(formula_name): result = model.tax_benefit_system.column_by_name.get(formula_name) if result is None: raise(Exception(dict( code = 404, message = u"You requested to compute variable '{}', but it does not exist" .format(formula_name) ))) if result.is_input_variable(): raise(Exception(dict( code = 422, message = u"You requested to compute variable '{}', but it is an input variable, it cannot be computed" .format(formula_name) ))) return result def normalize(params): result = dict() try: for param_name, value in params.items(): result[param_name] = normalize_param(param_name, value) except KeyError: raise Exception(dict( code = 400, message = u"You passed parameter '{}', but it does not exist".format(param_name) )) return result def normalize_param(name, value): column = model.tax_benefit_system.column_by_name[name] result, error = conv.pipe( column.input_to_dated_python # if column is not a date, this will be None and conv.pipe will be pass-through )(value) if error is not None: raise Exception(dict( code = 400, message = u"Parameter '{}' could not be normalized: {}".format(name, error) )) return result def parse_period(period_descriptor): period_descriptor = period_descriptor or default_period() try: result = periods.period(period_descriptor) except ValueError: raise(Exception(dict( code = 400, message = u"You requested computation for period '{}', but it could not be parsed as a period" .format(period_descriptor) ))) if result.unit not in ['year', 'month']: raise Exception(dict( code = 400, message = u"You passed period '{}', but it is not a month nor a year".format(period_descriptor) )) return result def default_period(): now = datetime.now() return '{}-{:02d}'.format(now.year, now.month) # req: the original request we're responding to. # version: a [semver](http://semver.org) identifier characterizing the version of the API. # data: dict. Will be transformed to JSON and added to the response root. # `data` will be mutated. Currently considered acceptable because responding marks process end. # params: dict. Parsed parameters. Will be echoed in the "params" key. def respond(req, version, data, params): data.update(dict( apiVersion = version, params = params )) ctx = contexts.Ctx(req) return wsgihelpers.respond_json( ctx, data, headers = wsgihelpers.handle_cross_origin_resource_sharing(ctx), json_dumps_default = wsgihelpers.convert_date_to_json, ) def compute(formula_name, params, period): simulation = create_simulation(params, period) resulting_dated_holder = simulation.compute(formula_name) return resulting_dated_holder.to_value_json()[0] # only one person => unwrap the array def create_simulation(data, period): result = simulations.Simulation( debug = False, period = period, tax_benefit_system = model.tax_benefit_system, ) # Initialize entities, assuming there is only one person and one of each other entities ("familles", # "foyers fiscaux", etc). persons = None for entity in result.entity_by_key_plural.itervalues(): entity.count = 1 entity.roles_count = 1 entity.step_size = 1 if entity.is_persons_entity: persons = entity # Link person to its entities using ID & role. for entity in result.entity_by_key_plural.itervalues(): if not entity.is_persons_entity: holder = persons.get_or_new_holder(entity.index_for_person_variable_name) holder.set_array(period, np.array([0])) holder = persons.get_or_new_holder(entity.role_for_person_variable_name) holder.set_array(period, np.array([0])) # Inject all variables from query string into arrays. for column_name, value in data.iteritems(): column = model.tax_benefit_system.column_by_name[column_name] entity = result.entity_by_key_plural[column.entity_key_plural] holder = entity.get_or_new_holder(column_name) if period.unit == 'year': holder.set_array(period, np.array([value], dtype = column.dtype)) elif period.unit == 'month': # Inject inputs for all months of year year = period.start.year month_index = 1 while month_index <= 12: month = periods.period('{}-{:02d}'.format(year, month_index)) holder.set_array(month, np.array([value], dtype = column.dtype)) month_index += 1 return result PKXE#openfisca_web_api/tests/__init__.pyPK'G~~~3openfisca_web_api/tests/test_swagger_integration.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from nose.tools import assert_greater from ..controllers.swagger import ( build_paths, ) from . import common def setup_module(module): common.get_or_load_app() def smoke_test_build_paths(): assert_greater(build_paths(), 100) PKF`g%%openfisca_web_api/tests/test_graph.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from webob import Request from . import common def setup_module(module): common.get_or_load_app() def test_graph_without_parameters(): req = Request.blank('/api/1/graph', method = 'GET') res = req.get_response(common.app) assert res.status_code == 200 PK'G~7Pnn&openfisca_web_api/tests/test_fields.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import json from nose.tools import assert_equal, assert_greater, assert_in, assert_is_instance from webob import Request from . import common def setup_module(module): common.get_or_load_app() def test_fields_without_parameters(): req = Request.blank('/api/1/fields', method = 'GET') res = req.get_response(common.app) assert_equal(res.status_code, 200, res.body) res_json = json.loads(res.body) assert_is_instance(res_json, dict) assert_in('columns', res_json) assert_in('columns_tree', res_json) assert_is_instance(res_json['columns'], dict) assert_is_instance(res_json['columns_tree'], dict) assert_greater(len(res_json['columns']), 0) assert_greater(len(res_json['columns_tree']), 0) PKFwq'openfisca_web_api/tests/test_reforms.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from webob import Request from . import common def setup_module(module): common.get_or_load_app() def test_reforms_without_parameters(): req = Request.blank('/api/1/reforms', method = 'GET') res = req.get_response(common.app) assert res.status_code == 200 PK'G\~:88)openfisca_web_api/tests/test_formula_2.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import json from webob import Request from nose.tools import assert_equal, assert_in, assert_not_in, assert_is_instance, assert_not_equal from . import common TARGET_URL = '/api/2/formula/' INPUT_VARIABLE = 'salaire_de_base' VALID_PERIOD = '2015-01' VALID_DAY = VALID_PERIOD + '-01' INVALID_PERIOD = 'herp' VALID_FORMULA = 'salaire_net_a_payer' DATED_FORMULA = 'allegement_fillon' DATE_PARAM = 'apprentissage_contrat_debut' DATE_PARAM_VALUE = VALID_PERIOD + '-03' VALID_OTHER_FORMULA = 'salaire_imposable' INVALID_FORMULA = 'inexistent' PARAM_VALUE = 1300 VALID_QUERY_STRING = '{0}={1}'.format(INPUT_VARIABLE, PARAM_VALUE) INVALID_QUERY_STRING = '{0}={1}'.format(INVALID_FORMULA, PARAM_VALUE) def send(formula = VALID_FORMULA, method = 'GET', period = '', query_string = ''): if period: period += '/' target = TARGET_URL + period + formula + '?' + query_string req = Request.blank(target, method = method) res = req.get_response(common.app) return { 'status_code': res.status_code, 'payload': json.loads(res.body) } def setup_module(module): common.get_or_load_app() def test_formula_get_status_code(): assert_equal(send()['status_code'], 200) def test_formula_post_status_code(): assert_equal(send(method = 'POST')['status_code'], 405) def test_formula_put_status_code(): assert_equal(send(method = 'PUT')['status_code'], 405) def test_formula_delete_status_code(): assert_equal(send(method = 'DELETE')['status_code'], 405) def test_formula_api_version(): assert_equal(send()['payload']['apiVersion'], '2.1.0') def test_dated_formula_status_code(): assert_equal(send(formula = DATED_FORMULA)['status_code'], 200) def test_not_a_formula_status_code(): assert_equal(send(formula = INPUT_VARIABLE)['status_code'], 422) def test_not_a_formula_error_message(): message = send(formula = INPUT_VARIABLE)['payload']['error']['message'] assert_in(INPUT_VARIABLE, message) assert_in('input variable', message) assert_in('cannot be computed', message) assert_not_in('{', message) # serialisation failed def test_invalid_formula_status_code(): assert_equal(send(formula = INVALID_FORMULA)['status_code'], 404) def test_invalid_formula_error_message(): message = send(formula = INVALID_FORMULA)['payload']['error']['message'] assert_in(INVALID_FORMULA, message) assert_in('does not exist', message) assert_not_in('{', message) # serialisation failed def test_invalid_formula_params(): params = send(formula = INVALID_FORMULA, query_string = VALID_QUERY_STRING)['payload']['params'] assert_equal({INPUT_VARIABLE: PARAM_VALUE}, params) def test_invalid_formula_with_valid_formula_status_code(): assert_equal(send(formula = VALID_FORMULA + '+' + INVALID_FORMULA)['status_code'], 404) def test_invalid_formula_with_valid_formula_error_message(): message = send(formula = VALID_FORMULA + '+' + INVALID_FORMULA)['payload']['error']['message'] assert_in(INVALID_FORMULA, message) assert_in('does not exist', message) assert_not_in('{', message) # serialisation failed def test_formula_value_without_params(): value = send()['payload']['values'][VALID_FORMULA] assert_is_instance(value, float) assert_equal(value, 0) def test_formula_value_with_params(): value = send(query_string = VALID_QUERY_STRING)['payload']['values'][VALID_FORMULA] assert_is_instance(value, float) assert_not_equal(value, 0) def test_formula_echo_params_without_params(): params = send()['payload']['params'] assert_equal({}, params) def test_formula_echo_params_with_params(): params = send(query_string = VALID_QUERY_STRING)['payload']['params'] assert_equal({INPUT_VARIABLE: PARAM_VALUE}, params) def test_echo_params_date(): params = send(query_string = DATE_PARAM + '=' + DATE_PARAM_VALUE)['payload']['params'] assert_equal({DATE_PARAM: DATE_PARAM_VALUE}, params) def test_bad_params_status_code(): assert_equal(send(query_string = INVALID_QUERY_STRING)['status_code'], 400) def test_bad_params_error_message(): message = send(query_string = INVALID_QUERY_STRING)['payload']['error']['message'] assert_in(INVALID_FORMULA, message) assert_in('does not exist', message) assert_not_in('{', message) # serialisation failed def test_unnormalizable_params_status_code(): assert_equal(send(query_string = 'birth=herp')['status_code'], 400) def test_unnormalizable_params_error_message(): message = send(query_string = 'birth=herp')['payload']['error']['message'] assert_in('birth', message) assert_in('normalized', message) assert_not_in('{', message) # serialisation failed def test_multiple_formulas_value_without_params(): values = send(formula = VALID_FORMULA + '+' + VALID_OTHER_FORMULA)['payload']['values'] for formula_name in (VALID_FORMULA, VALID_OTHER_FORMULA): assert_in(formula_name, values) assert_is_instance(values[formula_name], float) assert_equal(values[formula_name], 0) def test_period_year(): assert_equal(send(period = '2012')['payload']['period'], ['year', [2012, 1, 1], 1]) def test_period_month(): assert_equal(send(period = '2013-02')['payload']['period'], ['month', [2013, 2, 1], 1]) def test_invalid_period_status_code(): assert_equal(send(period = INVALID_PERIOD)['status_code'], 400) def test_invalid_period_error_message(): message = send(period = INVALID_PERIOD)['payload']['error']['message'] assert_in(INVALID_PERIOD, message) assert_in('could not be parsed', message) assert_not_in('{', message) # serialisation failed def test_invalid_period_day_status_code(): assert_equal(send(period = VALID_DAY)['status_code'], 400) def test_invalid_period_day_error_message(): message = send(period = VALID_DAY)['payload']['error']['message'] assert_in(VALID_DAY, message) assert_in('year', message) assert_in('month', message) assert_not_in('{', message) # serialisation failed PK'Gowj@%@%)openfisca_web_api/tests/test_calculate.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import json from nose.tools import assert_equal, assert_is_instance from webob import Request from . import common def setup_module(module): common.get_or_load_app() def test_calculate_without_body(): req = Request.blank('/api/1/calculate', headers = (('Content-Type', 'application/json'),), method = 'POST') res = req.get_response(common.app) assert_equal(res.status_code, 400) def test_calculate_with_invalid_body(): req = Request.blank( '/api/1/calculate', body = 'XXX', headers = (('Content-Type', 'application/json'),), method = 'POST', ) res = req.get_response(common.app) assert_equal(res.status_code, 400) def test_calculate_with_test_case(): test_case = { 'scenarios': [ { 'test_case': { 'familles': [ { 'parents': ['ind0', 'ind1'], }, ], 'foyers_fiscaux': [ { 'declarants': ['ind0', 'ind1'], }, ], 'individus': [ {'id': 'ind0', 'salaire_imposable': 15000}, {'id': 'ind1'}, ], 'menages': [ { 'conjoint': 'ind1', 'personne_de_reference': 'ind0', }, ], }, 'period': '2013', }, ], 'variables': ['revdisp'], } req = Request.blank( '/api/1/calculate', body = json.dumps(test_case), headers = (('Content-Type', 'application/json'),), method = 'POST', ) res = req.get_response(common.app) assert_equal(res.status_code, 200, res.body) def test_calculate_with_labels(): # First test returning numeric values of enumerations. test_case = { 'scenarios': [ { 'test_case': { 'familles': [ { 'parents': ['ind0', 'ind1'], }, ], 'foyers_fiscaux': [ { 'declarants': ['ind0', 'ind1'], }, ], 'individus': [ {'id': 'ind0', 'type_sal': 'prive_non_cadre'}, {'id': 'ind1', 'type_sal': 'prive_cadre'}, ], 'menages': [ { 'conjoint': 'ind1', 'personne_de_reference': 'ind0', }, ], }, 'period': '2013', }, ], 'variables': ['type_sal'], } req = Request.blank( '/api/1/calculate', body = json.dumps(test_case), headers = (('Content-Type', 'application/json'),), method = 'POST', ) res = req.get_response(common.app) assert_equal(res.status_code, 200, res.body) res_body_json = json.loads(res.body) individus = res_body_json['value'][0]['individus'] assert_equal(individus[0]['type_sal']['2013'], 0) assert_equal(individus[1]['type_sal']['2013'], 1) # Then test returning labels of enumerations. test_case['labels'] = True req = Request.blank( '/api/1/calculate', body = json.dumps(test_case), headers = (('Content-Type', 'application/json'),), method = 'POST', ) res = req.get_response(common.app) assert_equal(res.status_code, 200, res.body) res_body_json = json.loads(res.body) individus = res_body_json['value'][0]['individus'] assert_equal(individus[0]['type_sal']['2013'], 'prive_non_cadre') assert_equal(individus[1]['type_sal']['2013'], 'prive_cadre') def test_calculate_with_output_format_variables(): test_case = { 'output_format': 'variables', 'scenarios': [ { 'test_case': { 'familles': [ { 'parents': ['ind0', 'ind1'], }, ], 'foyers_fiscaux': [ { 'declarants': ['ind0', 'ind1'], }, ], 'individus': [ {'id': 'ind0'}, {'id': 'ind1'}, ], 'menages': [ { 'conjoint': 'ind1', 'personne_de_reference': 'ind0', }, ], }, 'period': '2013', }, ], 'variables': ['irpp'], } req = Request.blank( '/api/1/calculate', body = json.dumps(test_case), headers = (('Content-Type', 'application/json'),), method = 'POST', ) res = req.get_response(common.app) assert_equal(res.status_code, 200, res.body) res_body_json = json.loads(res.body) value = res_body_json['value'] assert_is_instance(value, list) assert_is_instance(value[0], dict) assert_equal(value[0].keys()[0], 'irpp') def test_calculate_with_reform(): test_case = { 'base_reforms': ['trannoy_wasmer'], 'scenarios': [ { 'test_case': { 'familles': [ { 'parents': ['ind0'], }, ], 'foyers_fiscaux': [ { 'declarants': ['ind0'], }, ], 'individus': [ {'id': 'ind0'}, ], 'menages': [ { 'personne_de_reference': 'ind0', }, ], }, 'period': '2013', }, ], 'variables': ['charge_loyer'], } req = Request.blank( '/api/1/calculate', body = json.dumps(test_case), headers = (('Content-Type', 'application/json'),), method = 'POST', ) res = req.get_response(common.app) assert_equal(res.status_code, 200, res.body) def test_calculate_with_trace(): test_case = { 'scenarios': [ { 'test_case': { 'familles': [ { 'parents': ['ind0', 'ind1'], }, ], 'foyers_fiscaux': [ { 'declarants': ['ind0', 'ind1'], }, ], 'individus': [ {'id': 'ind0'}, {'id': 'ind1'}, ], 'menages': [ { 'conjoint': 'ind1', 'personne_de_reference': 'ind0', }, ], }, 'period': '2014', }, ], 'trace': True, 'variables': ['irpp'], } req = Request.blank( '/api/1/calculate', body = json.dumps(test_case), headers = (('Content-Type', 'application/json'),), method = 'POST', ) res = req.get_response(common.app) assert_equal(res.status_code, 200, res.body) res_body_json = json.loads(res.body) tracebacks = res_body_json['tracebacks'] assert_is_instance(tracebacks, list) first_scenario_tracebacks = tracebacks[0] assert_is_instance(first_scenario_tracebacks, list) first_traceback = first_scenario_tracebacks[0] assert_is_instance(first_traceback, dict) traceback_with_parameters = next(item for item in first_scenario_tracebacks if item.get('parameters')) assert_is_instance(traceback_with_parameters['parameters'], list) PKFZ(openfisca_web_api/tests/test_entities.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from webob import Request from . import common def setup_module(module): common.get_or_load_app() def test_entities_without_parameters(): req = Request.blank('/api/1/entities', method = 'GET') res = req.get_response(common.app) assert res.status_code == 200 PK'GdI'openfisca_web_api/tests/test_swagger.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from nose.tools import assert_equal, assert_in from .. import model from ..controllers.swagger import ( build_metadata, map_path_base_to_swagger, map_type_to_swagger, map_parameters_to_swagger, map_parameter_to_swagger, ) from . import common def setup_module(module): common.get_or_load_app() def test_metadata_version(): actual = build_metadata() assert_equal('2.0', actual['swagger']) def test_metadata_description(): actual = build_metadata() assert_in('```', actual['info']['description'], 'Description should be Markdown') def test_map_path_base_to_swagger_without_url(): expected = { "summary": "Nombre d'enfants à charge titulaires de la carte d'invalidité", "tags": ["foy"] } actual = map_path_base_to_swagger({ "@type": "Integer", "cerfa_field": "G", "default": 0, "entity": "foy", "label": "Nombre d'enfants à charge titulaires de la carte d'invalidité", "name": "nbG" }) assert_equal(actual, expected) def test_map_path_base_to_swagger_with_url(): expected = { "summary": "Contribution exceptionnelle sur les hauts revenus", "tags": ["foy"], "externalDocs": { "url": "http://www.legifrance.gouv.fr/affichCode.do?" "cidTexte=LEGITEXT000006069577&idSectionTA=LEGISCTA000025049019" } } actual = map_path_base_to_swagger({ "@type": "Float", "default": 0, "entity": "foy", "label": "Contribution exceptionnelle sur les hauts revenus", "name": "cehr", "url": "http://www.legifrance.gouv.fr/affichCode.do?" "cidTexte=LEGITEXT000006069577&idSectionTA=LEGISCTA000025049019" }) assert_equal(actual, expected) def test_map_path_base_to_swagger_with_enum(): expected = { "summary": "Catégorie de taille d'entreprise (pour calcul des cotisations sociales)", "tags": ["ind"] } actual = map_path_base_to_swagger({ "@type": "Enumeration", "default": "0", "entity": "ind", "label": "Catégorie de taille d'entreprise (pour calcul des cotisations sociales)", "name": "taille_entreprise", "labels": { "0": "Non pertinent", "1": "Moins de 10 salariés", "2": "De 10 à 19 salariés", "3": "De 20 à 249 salariés", "4": "Plus de 250 salariés" } }) assert_equal(actual, expected) def test_map_type_to_swagger_integer(): assert_equal(map_type_to_swagger('Integer'), {'type': 'integer', 'format': 'int32'}) def test_map_type_to_swagger_float(): assert_equal(map_type_to_swagger('Float'), {'type': 'number', 'format': 'float'}) def test_map_type_to_swagger_date(): assert_equal(map_type_to_swagger('Date'), {'type': 'string', 'format': 'date'}) def test_map_type_to_swagger_boolean(): assert_equal(map_type_to_swagger('Boolean'), {'type': 'boolean'}) def test_map_type_to_swagger_string(): assert_equal(map_type_to_swagger('String'), {'type': 'string'}) def test_map_type_to_swagger_enum(): assert_equal(map_type_to_swagger('Enumeration'), {'type': 'string'}) def test_map_parameters_to_swagger(): target_column = model.tax_benefit_system.column_by_name['revdisp'] actual = [ description.get('name') for description in map_parameters_to_swagger(target_column) ] assert_equal(actual, ['ppe', 'rev_trav', 'rev_cap', 'pen', 'psoc', 'impo']) def test_map_parameter_to_swagger(): target_column = model.tax_benefit_system.column_by_name['rev_cap'] expected = { 'name': u'rev_cap', 'description': u'Revenus du patrimoine', 'in': 'query', 'type': 'number', 'format': 'float', 'default': 0 } assert_equal(map_parameter_to_swagger(target_column), expected) def test_map_enum_parameter_to_swagger(): target_column = model.tax_benefit_system.column_by_name['taille_entreprise'] expected = { 'name': u'taille_entreprise', 'description': u"Catégode taille d'entreprise (pour calcul des cotisations sociales)", 'in': 'query', 'type': 'string', 'enum': [ u'Non pertinent', u'Moins de 10 salariés', u'De 10 à 19 salariés', u'De 20 à 249 salariés', u'Plus de 250 salariés', ], 'default': u'Non pertinent', } assert_equal(map_parameter_to_swagger(target_column), expected) PK'GA+%openfisca_web_api/tests/test_field.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import json from nose.tools import assert_equal, assert_in, assert_is_instance from webob import Request from . import common def setup_module(module): common.get_or_load_app() def test_field_without_parameters(): req = Request.blank('/api/1/field', method = 'GET') res = req.get_response(common.app) assert_equal(res.status_code, 200, res.body) res_json = json.loads(res.body) assert_is_instance(res_json, dict) assert_in('value', res_json) assert_is_instance(res_json['value'], dict) assert_in('name', res_json['value']) assert_is_instance(res_json['value']['name'], basestring) PK'G@!openfisca_web_api/tests/common.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import os import pkg_resources from paste.deploy import loadapp app = None CONF_FILE_NAME = 'test.ini' def get_or_load_app(): global app if app is None: pkg_root_dir = pkg_resources.get_distribution('OpenFisca-Web-API').location conf_file_path = os.path.join(pkg_root_dir, CONF_FILE_NAME) app = loadapp(u'config:{}#main'.format(conf_file_path)) return app PKF{>>)openfisca_web_api/tests/test_formula_1.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import json from webob import Request from nose.tools import assert_equal, assert_in, assert_not_in, assert_is_instance, assert_not_equal from . import common TARGET_URL = '/api/1/formula/' INPUT_VARIABLE = 'salaire_de_base' VALID_FORMULA = 'salaire_net_a_payer' INVALID_FORMULA = 'inexistent' PARAM_VALUE = 1300 VALID_QUERY_STRING = '?{0}={1}'.format(INPUT_VARIABLE, PARAM_VALUE) INVALID_QUERY_STRING = '?{0}={1}'.format(INVALID_FORMULA, PARAM_VALUE) def send(formula = VALID_FORMULA, method = 'GET', query_string = ''): target = TARGET_URL + formula + query_string req = Request.blank(target, method = method) res = req.get_response(common.app) return { 'status_code': res.status_code, 'payload': json.loads(res.body) } def setup_module(module): common.get_or_load_app() def test_formula_get_status_code(): assert_equal(send()['status_code'], 200) def test_formula_post_status_code(): assert_equal(send(method = 'POST')['status_code'], 405) def test_formula_put_status_code(): assert_equal(send(method = 'PUT')['status_code'], 405) def test_formula_delete_status_code(): assert_equal(send(method = 'DELETE')['status_code'], 405) def test_formula_api_version(): assert_equal(send()['payload']['apiVersion'], 1) def test_not_a_formula_status_code(): assert_equal(send(formula = INPUT_VARIABLE)['status_code'], 422) def test_not_a_formula_error_message(): message = send(formula = INPUT_VARIABLE)['payload']['error']['message'] assert_in(INPUT_VARIABLE, message) assert_in('input variable', message) assert_in('cannot be computed', message) def test_not_a_formula_value(): assert_not_in('value', send(formula = INPUT_VARIABLE)['payload']) def test_invalid_formula_status_code(): assert_equal(send(formula = INVALID_FORMULA)['status_code'], 404) def test_invalid_formula_error_message(): message = send(formula = INVALID_FORMULA)['payload']['error']['message'] assert_in(INVALID_FORMULA, message) assert_in('does not exist', message) def test_invalid_formula_value(): assert_not_in('value', send(formula = INVALID_FORMULA)['payload']) def test_invalid_formula_params(): params = send(formula = INVALID_FORMULA, query_string = VALID_QUERY_STRING)['payload']['params'] assert_equal({INPUT_VARIABLE: PARAM_VALUE}, params) def test_formula_value_without_params(): value = send()['payload']['value'] assert_is_instance(value, float) assert_equal(value, 0) def test_formula_value_with_params(): value = send(query_string = VALID_QUERY_STRING)['payload']['value'] assert_is_instance(value, float) assert_not_equal(value, 0) def test_formula_echo_params_without_params(): params = send()['payload']['params'] assert_equal({}, params) def test_formula_echo_params_with_params(): params = send(query_string = VALID_QUERY_STRING)['payload']['params'] assert_equal({INPUT_VARIABLE: PARAM_VALUE}, params) def test_bad_params_status_code(): assert_equal(send(query_string = INVALID_QUERY_STRING)['status_code'], 400) def test_bad_params_error_message(): message = send(query_string = INVALID_QUERY_STRING)['payload']['error']['message'] assert_in(INVALID_FORMULA, message) assert_in('does not exist', message) def test_bad_params_value(): assert_not_in('value', send(query_string = INVALID_QUERY_STRING)['payload']) def test_unnormalizable_params_status_code(): assert_equal(send(query_string = '?birth=herp')['status_code'], 400) def test_unnormalizable_params_error_message(): message = send(query_string = '?birth=herp')['payload']['error']['message'] assert_in('birth', message) assert_in('normalized', message) def test_unnormalizable_params_value(): assert_not_in('value', send(query_string = '?birth=herp')['payload']) def test_bad_period_status_code(): assert_equal(send(query_string = '?period=herp')['status_code'], 400) def test_bad_period_error_message(): message = send(query_string = '?period=herp')['payload']['error']['message'] assert_in('herp', message) assert_in('could not be parsed', message) def test_bad_period_value(): assert_not_in('value', send(query_string = '?period=herp')['payload']) PK'Gי (openfisca_web_api/tests/test_simulate.py# -*- coding: utf-8 -*- # OpenFisca -- A versatile microsimulation software # By: OpenFisca Team # # Copyright (C) 2011, 2012, 2013, 2014, 2015 OpenFisca Team # https://github.com/openfisca # # This file is part of OpenFisca. # # OpenFisca is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # OpenFisca is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import json from nose.tools import assert_equal, assert_in, assert_not_in from webob import Request from . import common def setup_module(module): common.get_or_load_app() def test_simulate_without_body(): req = Request.blank('/api/1/simulate', headers = (('Content-Type', 'application/json'),), method = 'POST') res = req.get_response(common.app) assert_equal(res.status_code, 400) def test_simulate_with_invalid_body(): req = Request.blank( '/api/1/simulate', body = 'XXX', headers = (('Content-Type', 'application/json'),), method = 'POST', ) res = req.get_response(common.app) assert_equal(res.status_code, 400) def test_simulate_with_test_case(): test_case = { 'scenarios': [ { 'test_case': { 'familles': [ { 'parents': ['ind0', 'ind1'], }, ], 'foyers_fiscaux': [ { 'declarants': ['ind0', 'ind1'], }, ], 'individus': [ {'id': 'ind0', 'salaire_imposable': 15000}, {'id': 'ind1'}, ], 'menages': [ { 'conjoint': 'ind1', 'personne_de_reference': 'ind0', }, ], }, 'period': '2013', }, ], } req = Request.blank( '/api/1/simulate', body = json.dumps(test_case), headers = (('Content-Type', 'application/json'),), method = 'POST', ) res = req.get_response(common.app) assert_equal(res.status_code, 200, res.body) res_body_json = json.loads(res.body) assert_not_in('error', res_body_json) assert_in('value', res_body_json) PKS|/G^- 1OpenFisca_Web_API-0.5.1.dist-info/DESCRIPTION.rstUNKNOWN PKS|/G}CCC2OpenFisca_Web_API-0.5.1.dist-info/entry_points.txt[paste.app_factory] main = openfisca_web_api.application:make_app PKS|/GY+MM/OpenFisca_Web_API-0.5.1.dist-info/metadata.json{"classifiers": ["Development Status :: 2 - Pre-Alpha", "Environment :: Web Environment", "License :: OSI Approved :: GNU Affero General Public License v3", "Operating System :: POSIX", "Programming Language :: Python", "Topic :: Scientific/Engineering :: Information Analysis", "Topic :: Internet :: WWW/HTTP :: WSGI :: Server"], "extensions": {"python.details": {"contacts": [{"email": "contact@openfisca.fr", "name": "OpenFisca Team", "role": "author"}], "document_names": {"description": "DESCRIPTION.rst"}, "project_urls": {"Home": "https://github.com/openfisca/openfisca-web-api"}}, "python.exports": {"paste.app_factory": {"main": "openfisca_web_api.application:make_app"}}}, "extras": ["dev", "france", "test"], "generator": "bdist_wheel (0.24.0)", "keywords": ["api", "benefit", "microsimulation", "server", "social", "tax", "web"], "license": "http://www.fsf.org/licensing/licenses/agpl-3.0.html", "metadata_version": "2.0", "name": "OpenFisca-Web-API", "run_requires": [{"extra": "dev", "requires": ["PasteScript"]}, {"extra": "test", "requires": ["nose"]}, {"extra": "france", "requires": ["OpenFisca-France (>=0.5.1)"]}, {"requires": ["Babel (>=0.9.4)", "Biryani (>=0.10.4)", "OpenFisca-Core (>=0.5.0)", "OpenFisca-Parsers (>=0.5)", "PasteDeploy", "WebError (>=0.10)", "WebOb (>=1.1)"]}], "summary": "Web API for OpenFisca", "version": "0.5.1"}PKFDBoX 4OpenFisca_Web_API-0.5.1.dist-info/paster_plugins.txtPasteScript PKS|/GyB//*OpenFisca_Web_API-0.5.1.dist-info/pbr.json{"is_release": false, "git_version": "25ab32e"}PKS|/G /OpenFisca_Web_API-0.5.1.dist-info/top_level.txtopenfisca_web_api PKS|/G4\\'OpenFisca_Web_API-0.5.1.dist-info/WHEELWheel-Version: 1.0 Generator: bdist_wheel (0.24.0) Root-Is-Purelib: true Tag: py2-none-any PKS|/GkC*OpenFisca_Web_API-0.5.1.dist-info/METADATAMetadata-Version: 2.0 Name: OpenFisca-Web-API Version: 0.5.1 Summary: Web API for OpenFisca Home-page: https://github.com/openfisca/openfisca-web-api Author: OpenFisca Team Author-email: contact@openfisca.fr License: http://www.fsf.org/licensing/licenses/agpl-3.0.html Keywords: api benefit microsimulation server social tax web Platform: UNKNOWN Classifier: Development Status :: 2 - Pre-Alpha Classifier: Environment :: Web Environment Classifier: License :: OSI Approved :: GNU Affero General Public License v3 Classifier: Operating System :: POSIX Classifier: Programming Language :: Python Classifier: Topic :: Scientific/Engineering :: Information Analysis Classifier: Topic :: Internet :: WWW/HTTP :: WSGI :: Server Requires-Dist: Babel (>=0.9.4) Requires-Dist: Biryani (>=0.10.4) Requires-Dist: OpenFisca-Core (>=0.5.0) Requires-Dist: OpenFisca-Parsers (>=0.5) Requires-Dist: PasteDeploy Requires-Dist: WebError (>=0.10) Requires-Dist: WebOb (>=1.1) Provides-Extra: dev Requires-Dist: PasteScript; extra == 'dev' Provides-Extra: france Requires-Dist: OpenFisca-France (>=0.5.1); extra == 'france' Provides-Extra: test Requires-Dist: nose; extra == 'test' UNKNOWN PKS|/G\(OpenFisca_Web_API-0.5.1.dist-info/RECORDOpenFisca_Web_API-0.5.1.data/data/share/locale/fr/LC_MESSAGES/openfisca-web-api.mo,sha256=mY-CReh85RSkgslFVvN5IRMhogSe9iGWwncB_iurXsM,1758 OpenFisca_Web_API-0.5.1.data/data/share/openfisca/openfisca-web-api/test.ini,sha256=deKij130XEmA6QK8Ho5DATD6EzyHFUR76TS73cqSMzk,788 OpenFisca_Web_API-0.5.1.data/data/share/openfisca/openfisca-web-api/development-france.ini,sha256=bjH5F61fVt8f-xYd1LkUHfg9X6bnCczxseWCxK63Eoc,1555 OpenFisca_Web_API-0.5.1.data/data/share/openfisca/openfisca-web-api/development-tunisia.ini,sha256=AkY8Pf32pwYY1BuiRMY7qsM13N-AD-7aQqaPxPlbsCg,972 OpenFisca_Web_API-0.5.1.dist-info/RECORD,, OpenFisca_Web_API-0.5.1.dist-info/paster_plugins.txt,sha256=os9smPTefiOMBlMS9ZC7Yv_MbR_fG7U2_jil4jtEV3k,12 OpenFisca_Web_API-0.5.1.dist-info/WHEEL,sha256=54bVun1KfEBTJ68SHUmbxNPj80VxlQ0sHi4gZdGZXEY,92 OpenFisca_Web_API-0.5.1.dist-info/entry_points.txt,sha256=LJ_sM3bhPItL-JWspYRafVjJAEKvBFvQfnVvHUHfYfM,67 OpenFisca_Web_API-0.5.1.dist-info/DESCRIPTION.rst,sha256=OCTuuN6LcWulhHS3d5rfjdsQtW22n7HENFRh6jC6ego,10 OpenFisca_Web_API-0.5.1.dist-info/pbr.json,sha256=cIS2eBx69if_B7hykEUD1EOaZQ3T-cNbbmU1CthFk7A,47 OpenFisca_Web_API-0.5.1.dist-info/METADATA,sha256=eald9sM3kQfzYzoow8iBCktRaRQc3bt-wzOhpKuu180,1174 OpenFisca_Web_API-0.5.1.dist-info/top_level.txt,sha256=AAh0R5YcAroRpdryfxlT_thSxsN8SWGwj8rXXkZJLoo,18 OpenFisca_Web_API-0.5.1.dist-info/metadata.json,sha256=ov94ECDXQL7HFlAr_OLqTaf7njawSUaDg3Cj1DJAO3o,1357 openfisca_web_api/urls.py,sha256=1MYAI9SoaYJv164n0WkSKyMyDkGzBU8NR9pshHSPYmQ,5905 openfisca_web_api/__init__.py,sha256=mchF-oYF8RfJg0PUuhszKCW2KbWKdyXc4hgFsMTjsZk,968 openfisca_web_api/wsgihelpers.py,sha256=nGlm9_I_qJHzveyi3VVNwWaz3nGmpmnfF7KGww0VXTE,5044 openfisca_web_api/contexts.py,sha256=VIqKvyswOcBDXUB3qz1_bmTWAAlEzKUunnCSscZtGtc,7148 openfisca_web_api/conv.py,sha256=gU6Q_f2rShHH6heW2hC63Qxm0tuj1e0rkASYPi-XE3g,4127 openfisca_web_api/application.py,sha256=RmXce4GQADb8jd4HFRVR0u4sBnHzy-Jtfx9grN284mU,4100 openfisca_web_api/model.py,sha256=d2LYA0rUX-FlYWh93mOzRggKS-KNkfYXHA6U7l4ca4o,3810 openfisca_web_api/environment.py,sha256=oq2-68Cc2EAdJTXaeEAl9NDeDTscl7gW_OcEb2oSq2c,9075 openfisca_web_api/controllers/simulate.py,sha256=2Z8DrFtYPON-POL_e3aNmzMe8JkumrBtl_VZcvpRBJo,12832 openfisca_web_api/controllers/parameters.py,sha256=7U0T7PyOtMyzcn0mj6u_Nr_8G6UZEYE4NCLhwuEELes,4727 openfisca_web_api/controllers/swagger.py,sha256=rz7eBny_Dbw55OFe5foVz0DJAylzDNn1mw_LZ0IM_LE,7595 openfisca_web_api/controllers/__init__.py,sha256=uKfHir4_QHQLgtMRjjx0DY7Ez8WmULQYO_faT2jE6NI,2620 openfisca_web_api/controllers/variables.py,sha256=kG78jPpKrJMWePqXZWc1Zw30IQrAWiRC2phHtJmQWFY,3440 openfisca_web_api/controllers/calculate.py,sha256=-Jbwbl_nILgj0wiMuxyqB936sWXUyvI-Ulv0-AWr4Ic,19558 openfisca_web_api/controllers/fields.py,sha256=Y-EhZ550lVgrje1NjiPVQ8EKd-4OURXAsjOVrH3jxxk,3766 openfisca_web_api/controllers/field.py,sha256=lZah-LskgqfSW9ZARgMg-HWBm1z0P6eJRJDhhxHj6g4,4640 openfisca_web_api/controllers/reforms.py,sha256=qs7VDH7ZRatS6qz_RgiGp_U7-mdiI945Yq2lMPFilUo,2966 openfisca_web_api/controllers/entities.py,sha256=xZ-6FbOOhOfAQDSH8algOv9XfP1dTr1XRVmDnFh3-CA,3647 openfisca_web_api/controllers/graph.py,sha256=TK64t4YA17_nWkWiWXjxE4mUN6K5pFHO8s4pdnVNH4k,4753 openfisca_web_api/controllers/formula.py,sha256=qfHlzoM9DiHc0umGxugW18BfSkP0IBC5S4HartiJsvE,8229 openfisca_web_api/tests/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 openfisca_web_api/tests/test_swagger_integration.py,sha256=6SxvOme0u8m3difAEznY3fKYR456divlaSiLjnklIn0,1150 openfisca_web_api/tests/test_graph.py,sha256=wxP_H9JE7iMRIGOR3bYdYfBKOQPyr5OK2GDZQkdQ18M,1177 openfisca_web_api/tests/test_fields.py,sha256=lk8E04X8D0RBulyEbw1EyJ-uRF12SQkelxOiLyzCte0,1646 openfisca_web_api/tests/test_reforms.py,sha256=sNTTTXjInHaGsXKGf8599M3XiK7ki_N73jezOroAkro,1181 openfisca_web_api/tests/test_formula_2.py,sha256=INtrrqxrtPtotC42VzVX6Zqs0uG7bhbjIibAp7TcTQg,6968 openfisca_web_api/tests/test_calculate.py,sha256=3fHE0G5gd7wvJ5rN-CVE9OscbZJ1_1vmvfsMxEBtF3s,9536 openfisca_web_api/tests/test_entities.py,sha256=rGL1azArK6_K5krILhJgNZCJBuEPOXeW098yTXHsSJE,1183 openfisca_web_api/tests/test_swagger.py,sha256=j7OtJLl9Zaw0XnGcmLI-AvhwRr_0BV7-LAQYtxT4HC0,5527 openfisca_web_api/tests/test_field.py,sha256=Y5uMXDDA28a-lN424S6TLZjgKbvYFw3NISCvqKYUKE8,1531 openfisca_web_api/tests/common.py,sha256=y_lT2yLmsn_LnvajYEmJttNOoxttaDq1Wm-aMqREfOM,1301 openfisca_web_api/tests/test_formula_1.py,sha256=IpU7-OxDQNyjatPhuUzJdxtHXGuCiVGHDyZiXAsq6Kg,5182 openfisca_web_api/tests/test_simulate.py,sha256=jcgrPAEYQoJwiOwrMrpc7gvmjCcdcYy_8__phrcbCgs,2969 PKT'G.ROpenFisca_Web_API-0.5.1.data/data/share/locale/fr/LC_MESSAGES/openfisca-web-api.moPK'G.*LNOpenFisca_Web_API-0.5.1.data/data/share/openfisca/openfisca-web-api/test.iniPK'GZ{Z OpenFisca_Web_API-0.5.1.data/data/share/openfisca/openfisca-web-api/development-france.iniPKFm1i[WOpenFisca_Web_API-0.5.1.data/data/share/openfisca/openfisca-web-api/development-tunisia.iniPKFgexopenfisca_web_api/urls.pyPKF&,openfisca_web_api/__init__.pyPK'G 0openfisca_web_api/wsgihelpers.pyPKF]rxDopenfisca_web_api/contexts.pyPK'Gaopenfisca_web_api/conv.pyPK'Gs Vqopenfisca_web_api/application.pyPK'G9ٖopenfisca_web_api/model.pyPK'G-s#s# openfisca_web_api/environment.pyPK'G 2 2)copenfisca_web_api/controllers/simulate.pyPK'GtCww+openfisca_web_api/controllers/parameters.pyPK'Geĭ(openfisca_web_api/controllers/swagger.pyPK'G:W< < ){openfisca_web_api/controllers/__init__.pyPK'GXp p *!openfisca_web_api/controllers/variables.pyPK'Gû`/fLfL*/openfisca_web_api/controllers/calculate.pyPK'G0'd|openfisca_web_api/controllers/fields.pyPK'G5  &_openfisca_web_api/controllers/field.pyPK'G@7 (Ýopenfisca_web_api/controllers/reforms.pyPK'G??)openfisca_web_api/controllers/entities.pyPK'G#R<^&%openfisca_web_api/controllers/graph.pyPK'Gtg% % (openfisca_web_api/controllers/formula.pyPKXE#eopenfisca_web_api/tests/__init__.pyPK'G~~~3openfisca_web_api/tests/test_swagger_integration.pyPKF`g%%uopenfisca_web_api/tests/test_graph.pyPK'G~7Pnn&Qopenfisca_web_api/tests/test_fields.pyPKFwq'openfisca_web_api/tests/test_reforms.pyPK'G\~:88)openfisca_web_api/tests/test_formula_2.pyPK'Gowj@%@%)dopenfisca_web_api/tests/test_calculate.pyPKFZ(Aopenfisca_web_api/tests/test_entities.pyPK'GdI'Fopenfisca_web_api/tests/test_swagger.pyPK'GA+%\openfisca_web_api/tests/test_field.pyPK'G@!bopenfisca_web_api/tests/common.pyPKF{>>)>hopenfisca_web_api/tests/test_formula_1.pyPK'Gי (|openfisca_web_api/tests/test_simulate.pyPKS|/G^- 1OpenFisca_Web_API-0.5.1.dist-info/DESCRIPTION.rstPKS|/G}CCC2OpenFisca_Web_API-0.5.1.dist-info/entry_points.txtPKS|/GY+MM/OpenFisca_Web_API-0.5.1.dist-info/metadata.jsonPKFDBoX 4(OpenFisca_Web_API-0.5.1.dist-info/paster_plugins.txtPKS|/GyB//*OpenFisca_Web_API-0.5.1.dist-info/pbr.jsonPKS|/G /OpenFisca_Web_API-0.5.1.dist-info/top_level.txtPKS|/G4\\'\OpenFisca_Web_API-0.5.1.dist-info/WHEELPKS|/GkC*OpenFisca_Web_API-0.5.1.dist-info/METADATAPKS|/G\(ەOpenFisca_Web_API-0.5.1.dist-info/RECORDPK..