PK6=H`Zgajja/__init__.py# -*- coding: utf-8; -*- # # gajja/__init__.py # Part of Gajja, a Python test double library. # # Copyright © 2015–2016 Ben Finney # # This is free software: you may copy, modify, and/or distribute this work # under the terms of the GNU General Public License as published by the # Free Software Foundation; version 3 of that license or any later version. # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details. """ Gajja: Fake objects for real tests The `gajja` library provides a system of Python test double classes for specific system objects:: * Filesystem entries * Subprocesses The Korean word 가짜 (*gajja*; IPA ˈkaːt͡ɕ̤a) means “fake thing”. """ from __future__ import (absolute_import, unicode_literals) import sys if sys.version_info >= (3, 3): import builtins import unittest import unittest.mock as mock from io import StringIO as StringIO import configparser import collections.abc as collections_abc elif sys.version_info >= (3, 0): raise RuntimeError("Python 3 earlier than 3.3 is not supported.") elif sys.version_info >= (2, 7): # Python 2 standard library. import __builtin__ as builtins # Third-party backport of Python 3 unittest improvements. import unittest2 as unittest # Third-party mock library. import mock # Python 2 standard library. from StringIO import StringIO as BaseStringIO import ConfigParser as configparser import collections as collections_abc else: raise RuntimeError("Python earlier than 2.7 is not supported.") import os import os.path import io import shutil import tempfile import pwd import errno import time import signal import subprocess import functools import itertools import base64 import collections import weakref import shlex __package__ = str("gajja") __import__(__package__) __metaclass__ = type try: # Python 2 types. basestring unicode except NameError: # Alias for Python 3 types. basestring = str unicode = str def make_unique_slug(testcase): """ Make a unique slug for the test case. """ text = base64.b64encode( testcase.getUniqueString().encode('utf-8') ).decode('utf-8') result = text[-30:] return result try: StringIO except NameError: # We don't yet have the StringIO we want. Create it. class StringIO(BaseStringIO, object): """ StringIO with a context manager. """ def __enter__(self): return self def __exit__(self, *args): self.close() return False def readable(self): return True def writable(self): return True def seekable(self): return True def patch_stdout(testcase): """ Patch `sys.stdout` for the specified test case. """ patcher = mock.patch.object( sys, "stdout", wraps=StringIO()) patcher.start() testcase.addCleanup(patcher.stop) def patch_stderr(testcase): """ Patch `sys.stderr` for the specified test case. """ patcher = mock.patch.object( sys, "stderr", wraps=StringIO()) patcher.start() testcase.addCleanup(patcher.stop) def patch_signal_signal(testcase): """ Patch `signal.signal` for the specified test case. """ func_patcher = mock.patch.object(signal, "signal", autospec=True) func_patcher.start() testcase.addCleanup(func_patcher.stop) class FakeSystemExit(Exception): """ Fake double for `SystemExit` exception. """ EXIT_STATUS_SUCCESS = 0 EXIT_STATUS_FAILURE = 1 EXIT_STATUS_COMMAND_NOT_FOUND = 127 def patch_sys_exit(testcase): """ Patch `sys.exit` for the specified test case. """ func_patcher = mock.patch.object( sys, "exit", autospec=True, side_effect=FakeSystemExit()) func_patcher.start() testcase.addCleanup(func_patcher.stop) def patch_sys_argv(testcase): """ Patch the `sys.argv` sequence for the test case. """ if not hasattr(testcase, 'progname'): testcase.progname = make_unique_slug(testcase) if not hasattr(testcase, 'sys_argv'): testcase.sys_argv = [testcase.progname] patcher = mock.patch.object( sys, "argv", new=list(testcase.sys_argv)) patcher.start() testcase.addCleanup(patcher.stop) def patch_system_interfaces(testcase): """ Patch system interfaces that are disruptive to the test runner. """ patch_stdout(testcase) patch_stderr(testcase) patch_sys_exit(testcase) patch_sys_argv(testcase) def patch_time_time(testcase, values=None): """ Patch the `time.time` function for the specified test case. :param testcase: The `TestCase` instance for binding to the patch. :param values: An iterable to provide return values. :return: None. """ if values is None: values = itertools.count() def generator_fake_time(): while True: yield next(values) func_patcher = mock.patch.object(time, "time", autospec=True) func_patcher.start() testcase.addCleanup(func_patcher.stop) time.time.side_effect = generator_fake_time() def patch_os_environ(testcase): """ Patch the `os.environ` mapping. """ if not hasattr(testcase, 'os_environ'): testcase.os_environ = {} patcher = mock.patch.object(os, "environ", new=testcase.os_environ) patcher.start() testcase.addCleanup(patcher.stop) def patch_os_getpid(testcase): """ Patch `os.getpid` for the specified test case. """ func_patcher = mock.patch.object(os, "getpid", autospec=True) func_patcher.start() testcase.addCleanup(func_patcher.stop) def patch_os_getuid(testcase): """ Patch the `os.getuid` function. """ if not hasattr(testcase, 'os_getuid_return_value'): testcase.os_getuid_return_value = testcase.getUniqueInteger() func_patcher = mock.patch.object( os, "getuid", autospec=True, return_value=testcase.os_getuid_return_value) func_patcher.start() testcase.addCleanup(func_patcher.stop) PasswdEntry = collections.namedtuple( "PasswdEntry", "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell") def patch_pwd_getpwuid(testcase): """ Patch the `pwd.getpwuid` function. """ if not hasattr(testcase, 'pwd_getpwuid_return_value'): testcase.pwd_getpwuid_return_value = PasswdEntry( pw_name=make_unique_slug(testcase), pw_passwd=make_unique_slug(testcase), pw_uid=testcase.getUniqueInteger(), pw_gid=testcase.getUniqueInteger(), pw_gecos=testcase.getUniqueString(), pw_dir=tempfile.mktemp(), pw_shell=tempfile.mktemp()) if not isinstance(testcase.pwd_getpwuid_return_value, pwd.struct_passwd): pwent = pwd.struct_passwd(testcase.pwd_getpwuid_return_value) else: pwent = testcase.pwd_getpwuid_return_value func_patcher = mock.patch.object( pwd, "getpwuid", autospec=True, return_value=pwent) func_patcher.start() testcase.addCleanup(func_patcher.stop) def patch_os_path_exists(testcase): """ Patch `os.path.exists` behaviour for this test case. When the patched function is called, the registry of `FileDouble` instances for this test case will be used to get the instance for the path specified. """ orig_os_path_exists = os.path.exists def fake_os_path_exists(path): registry = FileDouble.get_registry_for_testcase(testcase) if path in registry: file_double = registry[path] result = file_double.os_path_exists_scenario.call_hook() else: result = orig_os_path_exists(path) return result func_patcher = mock.patch.object( os.path, "exists", autospec=True, side_effect=fake_os_path_exists) func_patcher.start() testcase.addCleanup(func_patcher.stop) def patch_os_access(testcase): """ Patch `os.access` behaviour for this test case. When the patched function is called, the registry of `FileDouble` instances for this test case will be used to get the instance for the path specified. """ orig_os_access = os.access def fake_os_access(path, mode): registry = FileDouble.get_registry_for_testcase(testcase) if path in registry: file_double = registry[path] result = file_double.os_access_scenario.call_hook(mode) else: result = orig_os_access(path, mode) return result func_patcher = mock.patch.object( os, "access", autospec=True, side_effect=fake_os_access) func_patcher.start() testcase.addCleanup(func_patcher.stop) StatResult = collections.namedtuple( 'StatResult', [ 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size', 'st_atime', 'st_mtime', 'st_ctime', ]) def patch_os_stat(testcase): """ Patch `os.stat` behaviour for this test case. When the patched function is called, the registry of `FileDouble` instances for this test case will be used to get the instance for the path specified. """ orig_os_stat = os.stat def fake_os_stat(path): registry = FileDouble.get_registry_for_testcase(testcase) if path in registry: file_double = registry[path] result = file_double.os_stat_scenario.call_hook() else: result = orig_os_stat(path) return result func_patcher = mock.patch.object( os, "stat", autospec=True, side_effect=fake_os_stat) func_patcher.start() testcase.addCleanup(func_patcher.stop) def patch_os_lstat(testcase): """ Patch `os.lstat` behaviour for this test case. When the patched function is called, the registry of `FileDouble` instances for this test case will be used to get the instance for the path specified. """ orig_os_lstat = os.lstat def fake_os_lstat(path): registry = FileDouble.get_registry_for_testcase(testcase) if path in registry: file_double = registry[path] result = file_double.os_lstat_scenario.call_hook() else: result = orig_os_lstat(path) return result func_patcher = mock.patch.object( os, "lstat", autospec=True, side_effect=fake_os_lstat) func_patcher.start() testcase.addCleanup(func_patcher.stop) def patch_os_unlink(testcase): """ Patch `os.unlink` behaviour for this test case. When the patched function is called, the registry of `FileDouble` instances for this test case will be used to get the instance for the path specified. """ orig_os_unlink = os.unlink def fake_os_unlink(path): registry = FileDouble.get_registry_for_testcase(testcase) if path in registry: file_double = registry[path] result = file_double.os_unlink_scenario.call_hook() else: result = orig_os_unlink(path) return result func_patcher = mock.patch.object( os, "unlink", autospec=True, side_effect=fake_os_unlink) func_patcher.start() testcase.addCleanup(func_patcher.stop) def patch_os_rmdir(testcase): """ Patch `os.rmdir` behaviour for this test case. When the patched function is called, the registry of `FileDouble` instances for this test case will be used to get the instance for the path specified. """ orig_os_rmdir = os.rmdir def fake_os_rmdir(path): registry = FileDouble.get_registry_for_testcase(testcase) if path in registry: file_double = registry[path] result = file_double.os_rmdir_scenario.call_hook() else: result = orig_os_rmdir(path) return result func_patcher = mock.patch.object( os, "rmdir", autospec=True, side_effect=fake_os_rmdir) func_patcher.start() testcase.addCleanup(func_patcher.stop) def patch_shutil_rmtree(testcase): """ Patch `shutil.rmtree` behaviour for this test case. When the patched function is called, the registry of `FileDouble` instances for this test case will be used to get the instance for the path specified. """ orig_shutil_rmtree = os.rmdir def fake_shutil_rmtree(path, ignore_errors=False, onerror=None): registry = FileDouble.get_registry_for_testcase(testcase) if path in registry: file_double = registry[path] result = file_double.shutil_rmtree_scenario.call_hook() else: result = orig_shutil_rmtree(path) return result func_patcher = mock.patch.object( shutil, "rmtree", autospec=True, side_effect=fake_shutil_rmtree) func_patcher.start() testcase.addCleanup(func_patcher.stop) def patch_tempfile_mkdtemp(testcase): """ Patch the `tempfile.mkdtemp` function for this test case. """ if not hasattr(testcase, 'tempfile_mkdtemp_file_double'): testcase.tempfile_mkdtemp_file_double = FileDouble(tempfile.mktemp()) double = testcase.tempfile_mkdtemp_file_double double.set_os_unlink_scenario('okay') double.set_os_rmdir_scenario('okay') double.register_for_testcase(testcase) func_patcher = mock.patch.object(tempfile, "mkdtemp", autospec=True) func_patcher.start() testcase.addCleanup(func_patcher.stop) tempfile.mkdtemp.return_value = testcase.tempfile_mkdtemp_file_double.path try: FileNotFoundError FileExistsError PermissionError except NameError: # Python 2 uses IOError. def _ensure_ioerror_args(init_args, init_kwargs, errno_value): result_kwargs = init_kwargs result_errno = errno_value result_strerror = os.strerror(errno_value) result_filename = None if len(init_args) >= 3: result_errno = init_args[0] result_filename = init_args[2] if 'errno' in init_kwargs: result_errno = init_kwargs['errno'] del result_kwargs['errno'] if 'filename' in init_kwargs: result_filename = init_kwargs['filename'] del result_kwargs['filename'] if len(init_args) >= 2: result_strerror = init_args[1] if 'strerror' in init_kwargs: result_strerror = init_kwargs['strerror'] del result_kwargs['strerror'] result_args = (result_errno, result_strerror, result_filename) return (result_args, result_kwargs) class FileNotFoundError(IOError): def __init__(self, *args, **kwargs): (args, kwargs) = _ensure_ioerror_args( args, kwargs, errno_value=errno.ENOENT) super(FileNotFoundError, self).__init__(*args, **kwargs) class FileExistsError(IOError): def __init__(self, *args, **kwargs): (args, kwargs) = _ensure_ioerror_args( args, kwargs, errno_value=errno.EEXIST) super(FileExistsError, self).__init__(*args, **kwargs) class PermissionError(IOError): def __init__(self, *args, **kwargs): (args, kwargs) = _ensure_ioerror_args( args, kwargs, errno_value=errno.EPERM) super(PermissionError, self).__init__(*args, **kwargs) def make_fake_file_scenarios(path=None): """ Make a collection of scenarios for testing with fake files. :path: The filesystem path of the fake file. If not specified, a valid random path will be generated. :return: A collection of scenarios for tests involving input files. The collection is a mapping from scenario name to a dictionary of scenario attributes. """ if path is None: file_path = tempfile.mktemp() else: file_path = path fake_file_empty = StringIO() fake_file_minimal = StringIO("Lorem ipsum.") fake_file_large = StringIO("\n".join( "ABCDEFGH" * 100 for __ in range(1000))) default_scenario_params = { 'open_scenario_name': 'okay', 'file_double_params': dict( path=file_path, fake_file=fake_file_minimal), } scenarios = { 'default': {}, 'error-not-exist': { 'open_scenario_name': 'nonexist', }, 'error-exist': { 'open_scenario_name': 'exist_error', }, 'error-read-denied': { 'open_scenario_name': 'read_denied', }, 'not-found': { 'file_double_params': dict( path=file_path, fake_file=fake_file_empty), }, 'exist-empty': { 'file_double_params': dict( path=file_path, fake_file=fake_file_empty), }, 'exist-minimal': { 'file_double_params': dict( path=file_path, fake_file=fake_file_minimal), }, 'exist-large': { 'file_double_params': dict( path=file_path, fake_file=fake_file_large), }, } for (name, scenario) in scenarios.items(): params = default_scenario_params.copy() params.update(scenario) scenario.update(params) scenario['file_double'] = FileDouble(**scenario['file_double_params']) scenario['file_double'].set_open_scenario(params['open_scenario_name']) scenario['fake_file_scenario_name'] = name return scenarios def get_file_doubles_from_fake_file_scenarios(scenarios): """ Get the `FileDouble` instances from fake file scenarios. :param scenarios: Collection of fake file scenarios. :return: Collection of `FileDouble` instances. """ doubles = set( scenario['file_double'] for scenario in scenarios if scenario['file_double'] is not None) return doubles def setup_file_double_behaviour(testcase, doubles=None): """ Set up file double instances and behaviour. :param testcase: The `TestCase` instance to modify. :param doubles: Collection of `FileDouble` instances. :return: None. If `doubles` is ``None``, a default collection will be made from the result of `make_fake_file_scenarios` result. """ if doubles is None: scenarios = make_fake_file_scenarios() doubles = get_file_doubles_from_fake_file_scenarios( scenarios.values()) for file_double in doubles: file_double.register_for_testcase(testcase) orig_open = builtins.open def fake_open(path, mode='rt', buffering=-1): registry = FileDouble.get_registry_for_testcase(testcase) if path in registry: file_double = registry[path] result = file_double.builtins_open_scenario.call_hook( mode, buffering) else: result = orig_open(path, mode, buffering) return result mock_open = mock.mock_open() mock_open.side_effect = fake_open func_patcher = mock.patch.object( builtins, "open", new=mock_open) func_patcher.start() testcase.addCleanup(func_patcher.stop) def setup_fake_file_fixtures(testcase): """ Set up fixtures for fake file doubles. :param testcase: The `TestCase` instance to modify. :return: None. """ scenarios = make_fake_file_scenarios() testcase.fake_file_scenarios = scenarios file_doubles = get_file_doubles_from_fake_file_scenarios( scenarios.values()) setup_file_double_behaviour(testcase, file_doubles) def set_fake_file_scenario(testcase, name): """ Set the named fake file scenario for the test case. """ scenario = testcase.fake_file_scenarios[name] testcase.fake_file_scenario = scenario testcase.file_double = scenario['file_double'] testcase.file_double.register_for_testcase(testcase) class TestDoubleFunctionScenario: """ Scenario for fake behaviour of a specific function. """ def __init__(self, scenario_name, double): self.scenario_name = scenario_name self.double = double self.call_hook = getattr( self, "_hook_{name}".format(name=self.scenario_name)) def __repr__(self): text = ( "<{class_name} instance: {id}" " name: {name!r}," " call_hook name: {hook_name!r}" " double: {double!r}" ">").format( class_name=self.__class__.__name__, id=id(self), name=self.scenario_name, double=self.double, hook_name=self.call_hook.__name__) return text def __eq__(self, other): result = True if not self.scenario_name == other.scenario_name: result = False if not self.double == other.double: result = False if not self.call_hook.__name__ == other.call_hook.__name__: result = False return result def __ne__(self, other): result = not self.__eq__(other) return result class os_path_exists_scenario(TestDoubleFunctionScenario): """ Scenario for `os.path.exists` behaviour. """ def _hook_exist(self): return True def _hook_not_exist(self): return False class os_access_scenario(TestDoubleFunctionScenario): """ Scenario for `os.access` behaviour. """ def _hook_okay(self, mode): return True def _hook_not_exist(self, mode): return False def _hook_read_only(self, mode): if mode & (os.W_OK | os.X_OK): result = False else: result = True return result def _hook_denied(self, mode): if mode & (os.R_OK | os.W_OK | os.X_OK): result = False else: result = True return result class os_stat_scenario(TestDoubleFunctionScenario): """ Scenario for `os.stat` behaviour. """ def _hook_okay(self): return self.double.stat_result def _hook_notfound_error(self): raise FileNotFoundError( self.double.path, "No such file or directory: {path!r}".format( path=self.double.path)) def _hook_denied_error(self): raise PermissionError( self.double.path, "Permission denied") class os_lstat_scenario(os_stat_scenario): """ Scenario for `os.lstat` behaviour. """ class os_unlink_scenario(TestDoubleFunctionScenario): """ Scenario for `os.unlink` behaviour. """ def _hook_okay(self): return None def _hook_nonexist(self): error = FileNotFoundError( self.double.path, "No such file or directory: {path!r}".format( path=self.double.path)) raise error def _hook_denied(self): error = PermissionError( self.double.path, "Permission denied") raise error class os_rmdir_scenario(TestDoubleFunctionScenario): """ Scenario for `os.rmdir` behaviour. """ def _hook_okay(self): return None def _hook_nonexist(self): error = FileNotFoundError( self.double.path, "No such file or directory: {path!r}".format( path=self.double.path)) raise error def _hook_denied(self): error = PermissionError( self.double.path, "Permission denied") raise error class shutil_rmtree_scenario(TestDoubleFunctionScenario): """ Scenario for `shutil.rmtree` behaviour. """ def _hook_okay(self): return None def _hook_nonexist(self): error = FileNotFoundError( self.double.path, "No such file or directory: {path!r}".format( path=self.double.path)) raise error def _hook_denied(self): error = PermissionError( self.double.path, "Permission denied") raise error class builtins_open_scenario(TestDoubleFunctionScenario): """ Scenario for `builtins.open` behaviour. """ def _hook_okay(self, mode, buffering): result = self.double.fake_file return result def _hook_nonexist(self, mode, buffering): if mode.startswith('r'): error = FileNotFoundError( self.double.path, "No such file or directory: {path!r}".format( path=self.double.path)) raise error result = self.double.fake_file return result def _hook_exist_error(self, mode, buffering): if mode.startswith('w') or mode.startswith('a'): error = FileExistsError( self.double.path, "File already exists: {path!r}".format( path=self.double.path)) raise error result = self.double.fake_file return result def _hook_read_denied(self, mode, buffering): if mode.startswith('r'): error = PermissionError( self.double.path, "Read denied on {path!r}".format( path=self.double.path)) raise error result = self.double.fake_file return result def _hook_write_denied(self, mode, buffering): if mode.startswith('w') or mode.startswith('a'): error = PermissionError( self.double.path, "Write denied on {path!r}".format( path=self.double.path)) raise error result = self.double.fake_file return result class TestDoubleWithRegistry: """ Abstract base class for a test double with a test case registry. """ registry_class = NotImplemented registries = NotImplemented function_scenario_params_by_class = NotImplemented def __new__(cls, *args, **kwargs): superclass = super(TestDoubleWithRegistry, cls) if superclass.__new__ is object.__new__: # The ‘object’ implementation complains about extra arguments. instance = superclass.__new__(cls) else: instance = superclass.__new__(cls, *args, **kwargs) instance.make_set_scenario_methods() return instance def __init__(self, *args, **kwargs): super(TestDoubleWithRegistry, self).__init__(*args, **kwargs) self._set_method_per_scenario() def _make_set_scenario_method(self, scenario_class, params): def method(self, name): scenario = scenario_class(name, double=self) setattr(self, scenario_class.__name__, scenario) method.__doc__ = ( """ Set the scenario for `{name}` behaviour. """ ).format(name=scenario_class.__name__) method.__name__ = str(params['set_scenario_method_name']) return method def make_set_scenario_methods(self): """ Make `set_` methods on this class. """ for (function_scenario_class, function_scenario_params) in ( self.function_scenario_params_by_class.items()): method = self._make_set_scenario_method( function_scenario_class, function_scenario_params) setattr(self.__class__, method.__name__, method) function_scenario_params['set_scenario_method'] = method def _set_method_per_scenario(self): """ Set the method to be called for each scenario. """ for function_scenario_params in ( self.function_scenario_params_by_class.values()): function_scenario_params['set_scenario_method']( self, function_scenario_params['default_scenario_name']) @classmethod def get_registry_for_testcase(cls, testcase): """ Get the FileDouble registry for the specified test case. """ # Key in a dict must be hashable. key = (testcase.__class__, id(testcase)) registry = cls.registries.setdefault(key, cls.registry_class()) return registry def get_registry_key(self): """ Get the registry key for this double. """ raise NotImplementedError def register_for_testcase(self, testcase): """ Add this instance to registry for the specified testcase. """ registry = self.get_registry_for_testcase(testcase) key = self.get_registry_key() registry[key] = self unregister_func = functools.partial( self.unregister_for_testcase, testcase) testcase.addCleanup(unregister_func) def unregister_for_testcase(self, testcase): """ Remove this instance from registry for the specified testcase. """ registry = self.get_registry_for_testcase(testcase) key = self.get_registry_key() if key in registry: registry.pop(key) def copy_fake_file(fake_file): """ Make a copy of the StringIO instance. """ fake_file_type = StringIO content = "" if fake_file is not None: fake_file_type = type(fake_file) content = fake_file.getvalue() assert issubclass(fake_file_type, object) result = fake_file_type(content) if hasattr(fake_file, 'encoding'): if not hasattr(result, 'encoding'): result.encoding = fake_file.encoding return result class FileDouble(TestDoubleWithRegistry): """ A testing double for a file. """ registry_class = dict registries = {} function_scenario_params_by_class = { os_path_exists_scenario: { 'default_scenario_name': 'not_exist', 'set_scenario_method_name': 'set_os_path_exists_scenario', }, os_access_scenario: { 'default_scenario_name': 'okay', 'set_scenario_method_name': 'set_os_access_scenario', }, os_stat_scenario: { 'default_scenario_name': 'okay', 'set_scenario_method_name': 'set_os_stat_scenario', }, os_lstat_scenario: { 'default_scenario_name': 'okay', 'set_scenario_method_name': 'set_os_lstat_scenario', }, builtins_open_scenario: { 'default_scenario_name': 'okay', 'set_scenario_method_name': 'set_open_scenario', }, os_unlink_scenario: { 'default_scenario_name': 'okay', 'set_scenario_method_name': 'set_os_unlink_scenario', }, os_rmdir_scenario: { 'default_scenario_name': 'okay', 'set_scenario_method_name': 'set_os_rmdir_scenario', }, shutil_rmtree_scenario: { 'default_scenario_name': 'okay', 'set_scenario_method_name': 'set_shutil_rmtree_scenario', }, } def __init__(self, path=None, fake_file=None, *args, **kwargs): self.path = path self.fake_file = copy_fake_file(fake_file) self.fake_file.name = path self._set_stat_result() super(FileDouble, self).__init__(*args, **kwargs) def _set_stat_result(self): """ Set the `os.stat` result for this file. """ size = len(self.fake_file.getvalue()) self.stat_result = StatResult( st_mode=0, st_ino=None, st_dev=None, st_nlink=None, st_uid=0, st_gid=0, st_size=size, st_atime=None, st_mtime=None, st_ctime=None, ) def __repr__(self): text = "FileDouble(path={path!r}, fake_file={fake_file!r})".format( path=self.path, fake_file=self.fake_file) return text def get_registry_key(self): """ Get the registry key for this double. """ result = self.path return result class os_popen_scenario(TestDoubleFunctionScenario): """ Scenario for `os.popen` behaviour. """ stream_name_by_mode = { 'w': 'stdin', 'r': 'stdout', } def _hook_success(self, argv, mode, buffering): stream_name = self.stream_name_by_mode[mode] stream_double = getattr( self.double, stream_name + '_double') result = stream_double.fake_file return result def _hook_failure(self, argv, mode, buffering): result = StringIO() return result def _hook_not_found(self, argv, mode, buffering): result = StringIO() return result class os_waitpid_scenario(TestDoubleFunctionScenario): """ Scenario for `os.waitpid` behaviour. """ def _hook_success(self, pid, options): result = (pid, EXIT_STATUS_SUCCESS) return result def _hook_failure(self, pid, options): result = (pid, EXIT_STATUS_FAILURE) return result def _hook_not_found(self, pid, options): error = OSError(errno.ECHILD) raise error class os_system_scenario(TestDoubleFunctionScenario): """ Scenario for `os.system` behaviour. """ def _hook_success(self, command): result = EXIT_STATUS_SUCCESS return result def _hook_failure(self, command): result = EXIT_STATUS_FAILURE return result def _hook_not_found(self, command): result = EXIT_STATUS_COMMAND_NOT_FOUND return result class os_spawnv_scenario(TestDoubleFunctionScenario): """ Scenario for `os.spawnv` behaviour. """ def _hook_success(self, mode, file, args): result = EXIT_STATUS_SUCCESS return result def _hook_failure(self, mode, file, args): result = EXIT_STATUS_FAILURE return result def _hook_not_found(self, mode, file, args): result = EXIT_STATUS_COMMAND_NOT_FOUND return result ARG_ANY = object() ARG_MORE = object() class PopenDouble: """ A testing double for `subprocess.Popen`. """ def __init__(self, args, *posargs, **kwargs): self.stdin = None self.stdout = None self.stderr = None self.pid = None self.returncode = None if kwargs.get('shell', False): self.argv = shlex.split(args) else: # The paramter is already a sequence of command-line arguments. self.argv = args def set_streams(self, subprocess_double, popen_kwargs): """ Set the streams on the `PopenDouble`. :param subprocess_double: The `SubprocessDouble` from which to get existing stream doubles. :param popen_kwargs: The keyword arguments to the `subprocess.Popen` call. :return: ``None``. """ for stream_name in ( name for name in ['stdin', 'stdout', 'stderr'] if name in popen_kwargs): stream_spec = popen_kwargs[stream_name] if stream_spec is subprocess.PIPE: stream_double = getattr( subprocess_double, "{name}_double".format(name=stream_name)) stream_file = stream_double.fake_file elif stream_spec is subprocess.STDOUT: stream_file = subprocess_double.stdout_double.fake_file else: stream_file = stream_spec setattr(self, stream_name, stream_file) def wait(self): """ Wait for subprocess to terminate. """ return self.returncode class subprocess_popen_scenario(TestDoubleFunctionScenario): """ Scenario for `subprocess.Popen` behaviour. """ def _hook_success(self, testcase, args, *posargs, **kwargs): double = self.double.popen_double double.set_streams(self.double, kwargs) return double def patch_subprocess_popen(testcase): """ Patch `subprocess.Popen` constructor for this test case. :param testcase: The `TestCase` instance to modify. :return: None. When the patched function is called, the registry of `SubprocessDouble` instances for this test case will be used to get the instance for the program path specified. """ orig_subprocess_popen = subprocess.Popen def fake_subprocess_popen(args, *posargs, **kwargs): if kwargs.get('shell', False): argv = shlex.split(args) else: argv = args registry = SubprocessDouble.get_registry_for_testcase(testcase) if argv in registry: subprocess_double = registry[argv] result = subprocess_double.subprocess_popen_scenario.call_hook( testcase, args, *posargs, **kwargs) else: result = orig_subprocess_popen(args, *posargs, **kwargs) return result func_patcher = mock.patch.object( subprocess, "Popen", autospec=True, side_effect=fake_subprocess_popen) func_patcher.start() testcase.addCleanup(func_patcher.stop) class subprocess_call_scenario(TestDoubleFunctionScenario): """ Scenario for `subprocess.call` behaviour. """ def _hook_success(self, command): result = EXIT_STATUS_SUCCESS return result def _hook_failure(self, command): result = EXIT_STATUS_FAILURE return result def _hook_not_found(self, command): result = EXIT_STATUS_COMMAND_NOT_FOUND return result def patch_subprocess_call(testcase): """ Patch `subprocess.call` function for this test case. :param testcase: The `TestCase` instance to modify. :return: None. When the patched function is called, the registry of `SubprocessDouble` instances for this test case will be used to get the instance for the program path specified. """ orig_subprocess_call = subprocess.call def fake_subprocess_call(command, *posargs, **kwargs): if kwargs.get('shell', False): command_argv = shlex.split(command) else: command_argv = command registry = SubprocessDouble.get_registry_for_testcase(testcase) if command_argv in registry: subprocess_double = registry[command_argv] result = subprocess_double.subprocess_call_scenario.call_hook( command) else: result = orig_subprocess_call(command, *posargs, **kwargs) return result func_patcher = mock.patch.object( subprocess, "call", autospec=True, side_effect=fake_subprocess_call) func_patcher.start() testcase.addCleanup(func_patcher.stop) class subprocess_check_call_scenario(TestDoubleFunctionScenario): """ Scenario for `subprocess.check_call` behaviour. """ def _hook_success(self, command): return None def _hook_failure(self, command): result = EXIT_STATUS_FAILURE error = subprocess.CalledProcessError(result, command) raise error def _hook_not_found(self, command): result = EXIT_STATUS_COMMAND_NOT_FOUND error = subprocess.CalledProcessError(result, command) raise error def patch_subprocess_check_call(testcase): """ Patch `subprocess.check_call` function for this test case. :param testcase: The `TestCase` instance to modify. :return: None. When the patched function is called, the registry of `SubprocessDouble` instances for this test case will be used to get the instance for the program path specified. """ orig_subprocess_check_call = subprocess.check_call def fake_subprocess_check_call(command, *posargs, **kwargs): if kwargs.get('shell', False): command_argv = shlex.split(command) else: command_argv = command registry = SubprocessDouble.get_registry_for_testcase(testcase) if command_argv in registry: subprocess_double = registry[command_argv] scenario = subprocess_double.subprocess_check_call_scenario result = scenario.call_hook(command) else: result = orig_subprocess_check_call(command, *posargs, **kwargs) return result func_patcher = mock.patch.object( subprocess, "check_call", autospec=True, side_effect=fake_subprocess_check_call) func_patcher.start() testcase.addCleanup(func_patcher.stop) class SubprocessDoubleRegistry(collections_abc.MutableMapping): """ Registry of `SubprocessDouble` instances by `argv`. """ def __init__(self, *args, **kwargs): items = [] if args: if isinstance(args[0], collections_abc.Mapping): items = args[0].items() if isinstance(args[0], collections_abc.Iterable): items = args[0] self._mapping = dict(items) def __repr__(self): text = "<{class_name} object: {mapping}>".format( class_name=self.__class__.__name__, mapping=self._mapping) return text def _match_argv(self, argv): """ Match the specified `argv` with our registered keys. """ match = None if not isinstance(argv, collections_abc.Sequence): return match candidates = iter(self._mapping) while match is None: try: candidate = next(candidates) except StopIteration: break found = None if candidate == argv: # An exact match. found = True word_iter = enumerate(candidate) while found is None: try: (word_index, candidate_word) = next(word_iter) except StopIteration: break if candidate_word is ARG_MORE: # Candiate matches any remaining words. We have a match. found = True elif word_index > len(argv): # Candidate is too long for the specified argv. found = False elif candidate_word is ARG_ANY: # Candidate matches any word at this position. continue elif candidate_word == argv[word_index]: # Candidate matches the word at this position. continue else: # This candidate does not match. found = False if found is None: # Reached the end of the candidate without a mismatch. found = True if found: match = candidate return match def __getitem__(self, key): match = self._match_argv(key) if match is None: raise KeyError(key) result = self._mapping[match] return result def __setitem__(self, key, value): if key in self: del self[key] self._mapping[key] = value def __delitem__(self, key): match = self._match_argv(key) if match is not None: del self._mapping[match] def __iter__(self): return self._mapping.__iter__() def __len__(self): return self._mapping.__len__() class SubprocessDouble(TestDoubleWithRegistry): """ A testing double for a subprocess. """ registry_class = SubprocessDoubleRegistry registries = {} double_by_pid = weakref.WeakValueDictionary() function_scenario_params_by_class = { subprocess_popen_scenario: { 'default_scenario_name': 'success', 'set_scenario_method_name': 'set_subprocess_popen_scenario', }, subprocess_call_scenario: { 'default_scenario_name': 'success', 'set_scenario_method_name': 'set_subprocess_call_scenario', }, subprocess_check_call_scenario: { 'default_scenario_name': 'success', 'set_scenario_method_name': 'set_subprocess_check_call_scenario', }, os_popen_scenario: { 'default_scenario_name': 'success', 'set_scenario_method_name': 'set_os_popen_scenario', }, os_waitpid_scenario: { 'default_scenario_name': 'success', 'set_scenario_method_name': 'set_os_waitpid_scenario', }, os_system_scenario: { 'default_scenario_name': 'success', 'set_scenario_method_name': 'set_os_system_scenario', }, os_spawnv_scenario: { 'default_scenario_name': 'success', 'set_scenario_method_name': 'set_os_spawnv_scenario', }, } def __init__(self, path=None, argv=None, *args, **kwargs): if path is None: path = tempfile.mktemp() self.path = path if argv is None: command_name = os.path.basename(path) argv = [command_name] self.argv = argv self.pid = self._make_pid() self._register_by_pid() self.set_popen_double() stream_class = SubprocessDouble.stream_class for stream_name in ['stdin', 'stdout', 'stderr']: fake_file = stream_class() file_double = FileDouble(fake_file=fake_file) stream_double_name = '{name}_double'.format(name=stream_name) setattr(self, stream_double_name, file_double) super(SubprocessDouble, self).__init__(*args, **kwargs) def set_popen_double(self): """ Set the `PopenDouble` for this instance. """ double = PopenDouble(self.argv) double.pid = self.pid self.popen_double = double def __repr__(self): text = ( "").format( id=id(self), path=self.path, argv=self.argv, stdin_double=self.stdin_double, stdout_double=self.stdout_double, stderr_double=self.stderr_double) return text @classmethod def _make_pid(cls): """ Make a unique PID for a subprocess. """ for pid in itertools.count(1): yield pid def _register_by_pid(self): """ Register this subprocess by its PID. """ self.__class__.double_by_pid[self.pid] = self def get_registry_key(self): """ Get the registry key for this double. """ result = tuple(self.argv) return result stream_class = io.BytesIO stream_encoding = "utf-8" def set_stdin_content(self, text, bytes_encoding=stream_encoding): """ Set the content of the `stdin` stream for this double. """ content = text.encode(bytes_encoding) fake_file = self.stream_class(content) self.stdin_double.fake_file = fake_file def set_stdout_content(self, text, bytes_encoding=stream_encoding): """ Set the content of the `stdout` stream for this double. """ content = text.encode(bytes_encoding) fake_file = self.stream_class(content) self.stdout_double.fake_file = fake_file def set_stderr_content(self, text, bytes_encoding=stream_encoding): """ Set the content of the `stderr` stream for this double. """ content = text.encode(bytes_encoding) fake_file = self.stream_class(content) self.stderr_double.fake_file = fake_file def make_fake_subprocess_scenarios(path=None): """ Make a collection of scenarios for testing with fake files. :path: The filesystem path of the fake program. If not specified, a valid random path will be generated. :return: A collection of scenarios for tests involving subprocesses. The collection is a mapping from scenario name to a dictionary of scenario attributes. """ if path is None: file_path = tempfile.mktemp() else: file_path = path default_scenario_params = { 'return_value': EXIT_STATUS_SUCCESS, 'program_path': file_path, 'argv_after_command_name': [], } scenarios = { 'default': {}, 'not-found': { 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND, }, } for (name, scenario) in scenarios.items(): params = default_scenario_params.copy() params.update(scenario) scenario.update(params) program_path = params['program_path'] program_name = os.path.basename(params['program_path']) argv = [program_name] argv.extend(params['argv_after_command_name']) subprocess_double_params = dict( path=program_path, argv=argv, ) subprocess_double = SubprocessDouble(**subprocess_double_params) scenario['subprocess_double'] = subprocess_double scenario['fake_file_scenario_name'] = name return scenarios def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios): """ Get the `SubprocessDouble` instances from fake subprocess scenarios. :param scenarios: Collection of fake subprocess scenarios. :return: Collection of `SubprocessDouble` instances. """ doubles = set( scenario['subprocess_double'] for scenario in scenarios if scenario['subprocess_double'] is not None) return doubles def setup_subprocess_double_behaviour(testcase, doubles=None): """ Set up subprocess double instances and behaviour. :param testcase: The `TestCase` instance to modify. :param doubles: Collection of `SubprocessDouble` instances. :return: None. If `doubles` is ``None``, a default collection will be made from the return value of `make_fake_subprocess_scenarios`. """ if doubles is None: scenarios = make_fake_subprocess_scenarios() doubles = get_subprocess_doubles_from_fake_subprocess_scenarios( scenarios.values()) for double in doubles: double.register_for_testcase(testcase) def setup_fake_subprocess_fixtures(testcase): """ Set up fixtures for fake subprocess doubles. :param testcase: The `TestCase` instance to modify. :return: None. """ scenarios = make_fake_subprocess_scenarios() testcase.fake_subprocess_scenarios = scenarios doubles = get_subprocess_doubles_from_fake_subprocess_scenarios( scenarios.values()) setup_subprocess_double_behaviour(testcase, doubles) def patch_os_popen(testcase): """ Patch `os.popen` behaviour for this test case. :param testcase: The `TestCase` instance to modify. :return: None. When the patched function is called, the registry of `SubprocessDouble` instances for this test case will be used to get the instance for the program path specified. """ orig_os_popen = os.popen def fake_os_popen(cmd, mode='r', buffering=-1): registry = SubprocessDouble.get_registry_for_testcase(testcase) if isinstance(cmd, basestring): command_argv = shlex.split(cmd) else: command_argv = cmd if command_argv in registry: subprocess_double = registry[command_argv] result = subprocess_double.os_popen_scenario.call_hook( command_argv, mode, buffering) else: result = orig_os_popen(cmd, mode, buffering) return result func_patcher = mock.patch.object( os, "popen", autospec=True, side_effect=fake_os_popen) func_patcher.start() testcase.addCleanup(func_patcher.stop) def patch_os_waitpid(testcase): """ Patch `os.waitpid` behaviour for this test case. :param testcase: The `TestCase` instance to modify. :return: None. When the patched function is called, the registry of `SubprocessDouble` instances for this test case will be used to get the instance for the program path specified. """ orig_os_waitpid = os.waitpid def fake_os_waitpid(pid, options): registry = SubprocessDouble.double_by_pid if pid in registry: subprocess_double = registry[pid] result = subprocess_double.os_waitpid_scenario.call_hook( pid, options) else: result = orig_os_waitpid(pid, options) return result func_patcher = mock.patch.object( os, "waitpid", autospec=True, side_effect=fake_os_waitpid) func_patcher.start() testcase.addCleanup(func_patcher.stop) def patch_os_system(testcase): """ Patch `os.system` behaviour for this test case. :param testcase: The `TestCase` instance to modify. :return: None. When the patched function is called, the registry of `SubprocessDouble` instances for this test case will be used to get the instance for the program path specified. """ orig_os_system = os.system def fake_os_system(command): registry = SubprocessDouble.get_registry_for_testcase(testcase) command_argv = shlex.split(command) if command_argv in registry: subprocess_double = registry[command_argv] result = subprocess_double.os_system_scenario.call_hook( command) else: result = orig_os_system(command) return result func_patcher = mock.patch.object( os, "system", autospec=True, side_effect=fake_os_system) func_patcher.start() testcase.addCleanup(func_patcher.stop) def patch_os_spawnv(testcase): """ Patch `os.spawnv` behaviour for this test case. :param testcase: The `TestCase` instance to modify. :return: None. When the patched function is called, the registry of `SubprocessDouble` instances for this test case will be used to get the instance for the program path specified. """ orig_os_spawnv = os.spawnv def fake_os_spawnv(mode, file, args): registry = SubprocessDouble.get_registry_for_testcase(testcase) registry_key = tuple(args) if registry_key in registry: subprocess_double = registry[registry_key] result = subprocess_double.os_spawnv_scenario.call_hook( mode, file, args) else: result = orig_os_spawnv(mode, file, args) return result func_patcher = mock.patch.object( os, "spawnv", autospec=True, side_effect=fake_os_spawnv) func_patcher.start() testcase.addCleanup(func_patcher.stop) # Local variables: # coding: utf-8 # mode: python # End: # vim: fileencoding=utf-8 filetype=python : PKB=HM%gajja-0.1.1.dist-info/DESCRIPTION.rstThe `gajja` library provides a system of Python test double classes for specific system objects:: * Filesystem entries * Subprocesses The Korean word 가짜 (*gajja*; IPA ˈkaːt͡ɕ̤a) means “fake thing”. PKB=H%hQQ#gajja-0.1.1.dist-info/metadata.json{"classifiers": ["Development Status :: 3 - Alpha", "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", "Operating System :: POSIX", "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3.3", "Programming Language :: Python :: 3.4", "Programming Language :: Python :: 3.5", "Intended Audience :: Developers", "Topic :: Software Development :: Testing"], "extensions": {"python.details": {"contacts": [{"email": "ben+python@benfinney.id.au", "name": "Ben Finney", "role": "author"}], "document_names": {"description": "DESCRIPTION.rst"}, "project_urls": {"Home": "https://notabug.org/bignose/python-gajja"}}}, "extras": [], "generator": "bdist_wheel (0.26.0)", "keywords": ["test", "double", "fake", "mock", "filesystem", "subprocess"], "license": "GPLv3+", "metadata_version": "2.0", "name": "gajja", "run_requires": [{"requires": ["mock (>=1.3)", "setuptools", "testtools", "unittest2 (>=0.5.1)"]}], "summary": "Gajja: Fake objects for real tests", "test_requires": [{"requires": ["mock (>=1.3)", "testtools", "unittest2 (>=0.5.1)"]}], "version": "0.1.1"}PKB=H*#gajja-0.1.1.dist-info/top_level.txtgajja PKB=H}\\gajja-0.1.1.dist-info/WHEELWheel-Version: 1.0 Generator: bdist_wheel (0.26.0) Root-Is-Purelib: true Tag: py3-none-any PKB=H%{AAgajja-0.1.1.dist-info/METADATAMetadata-Version: 2.0 Name: gajja Version: 0.1.1 Summary: Gajja: Fake objects for real tests Home-page: https://notabug.org/bignose/python-gajja Author: Ben Finney Author-email: ben+python@benfinney.id.au License: GPLv3+ Keywords: test,double,fake,mock,filesystem,subprocess Platform: UNKNOWN Classifier: Development Status :: 3 - Alpha Classifier: License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+) Classifier: Operating System :: POSIX Classifier: Programming Language :: Python :: 2.7 Classifier: Programming Language :: Python :: 3.3 Classifier: Programming Language :: Python :: 3.4 Classifier: Programming Language :: Python :: 3.5 Classifier: Intended Audience :: Developers Classifier: Topic :: Software Development :: Testing Requires-Dist: mock (>=1.3) Requires-Dist: setuptools Requires-Dist: testtools Requires-Dist: unittest2 (>=0.5.1) The `gajja` library provides a system of Python test double classes for specific system objects:: * Filesystem entries * Subprocesses The Korean word 가짜 (*gajja*; IPA ˈkaːt͡ɕ̤a) means “fake thing”. PKB=HȂ,,gajja-0.1.1.dist-info/RECORDgajja/__init__.py,sha256=3y3vZ2ZOj3e2DI9f1hnBjd06IOhYBncRLYtcYBGDWuA,55838 gajja-0.1.1.dist-info/DESCRIPTION.rst,sha256=5z4KqXWuMGu-4C6hRBsDYdAcdXaG8X4CpvrUyFg9m-A,215 gajja-0.1.1.dist-info/METADATA,sha256=fjkruLINNROibEgX7ARwuUxwdN00hJOT7AiwmRdVvDo,1089 gajja-0.1.1.dist-info/RECORD,, gajja-0.1.1.dist-info/WHEEL,sha256=zX7PHtH_7K-lEzyK75et0UBa3Bj8egCBMXe1M4gc6SU,92 gajja-0.1.1.dist-info/metadata.json,sha256=5iEk4IXugNwgmzge1xRzjxUA0hdtUXwB9DpKiI8PSfg,1105 gajja-0.1.1.dist-info/top_level.txt,sha256=faJd-kylK_-Zu52Nr1taBPjd1Se0gwgEpUzUCPdwRHM,6 PK6=H`Zgajja/__init__.pyPKB=HM%Mgajja-0.1.1.dist-info/DESCRIPTION.rstPKB=H%hQQ#ggajja-0.1.1.dist-info/metadata.jsonPKB=H*#gajja-0.1.1.dist-info/top_level.txtPKB=H}\\@gajja-0.1.1.dist-info/WHEELPKB=H%{AAgajja-0.1.1.dist-info/METADATAPKB=HȂ,,Rgajja-0.1.1.dist-info/RECORDPK