PK!1gitman/__init__.py"""Package for GitMan.""" from pkg_resources import get_distribution from .commands import ( # pylint: disable=redefined-builtin delete as uninstall, display as list, init, install, lock, update, ) __version__ = get_distribution('gitman').version PK!TNNgitman/__main__.py"""Package entry point.""" import importlib import os import sys from gitman.cli import main # Declare itself as package if needed for better debugging support # pylint: disable=multiple-imports,wrong-import-position,redefined-builtin,used-before-assignment if __name__ == '__main__' and __package__ is None: # pragma: no cover parent_dir = os.path.abspath(os.path.dirname(__file__)) sys.path.append(os.path.dirname(parent_dir)) __package__ = os.path.basename(parent_dir) importlib.import_module(__package__) if __name__ == '__main__': # pragma: no cover main() PK!B$$ gitman/cli.py#!/usr/bin/env python3 """Command-line interface.""" import argparse import logging import sys from typing import Dict, List from . import __version__, commands, common, exceptions log = logging.getLogger(__name__) def main(args=None, function=None): # pylint: disable=too-many-statements """Process command-line arguments and run the program.""" # Shared options debug = argparse.ArgumentParser(add_help=False) debug.add_argument( '-V', '--version', action='version', version="GitMan v" + __version__ ) debug_group = debug.add_mutually_exclusive_group() debug_group.add_argument( '-v', '--verbose', action='count', default=0, help="enable verbose logging" ) debug_group.add_argument( '-q', '--quiet', action='store_const', const=-1, dest='verbose', help="only display errors and prompts", ) project = argparse.ArgumentParser(add_help=False) project.add_argument( '-r', '--root', metavar='PATH', help="root directory of the project" ) depth = argparse.ArgumentParser(add_help=False) depth.add_argument( '-d', '--depth', type=common.positive_int, default=5, metavar="NUM", help="limit the number of dependency levels", ) options = argparse.ArgumentParser(add_help=False) options.add_argument( '-c', '--clean', action='store_true', help="delete ignored files in dependencies", ) options_group = options.add_mutually_exclusive_group() options_group.add_argument( '-f', '--force', action='store_true', help="overwrite uncommitted changes in dependencies", ) options_group.add_argument( '-s', '--skip-changes', action='store_true', dest='skip_changes', help="skip dependencies with uncommitted changes", ) # Main parser parser = argparse.ArgumentParser( prog='gitman', description="A language-agnostic dependency manager using Git.", parents=[debug], formatter_class=common.WideHelpFormatter, ) subs = parser.add_subparsers(help="", dest='command', metavar="") # Init parser info = "create a new config file for the project" sub = subs.add_parser( 'init', description=info.capitalize() + '.', help=info, parents=[debug], formatter_class=common.WideHelpFormatter, ) # Install parser info = "get the specified versions of all dependencies" sub = subs.add_parser( 'install', description=info.capitalize() + '.', help=info, parents=[debug, project, depth, options], formatter_class=common.WideHelpFormatter, ) sub.add_argument('name', nargs='*', help="list of dependencies names to install") sub.add_argument( '-e', '--fetch', action='store_true', help="always fetch the latest branches" ) # Update parser info = "update dependencies to the latest versions" sub = subs.add_parser( 'update', description=info.capitalize() + '.', help=info, parents=[debug, project, depth, options], formatter_class=common.WideHelpFormatter, ) sub.add_argument('name', nargs='*', help="list of dependencies names to update") sub.add_argument( '-a', '--all', action='store_true', dest='recurse', help="also update all nested dependencies", ) sub.add_argument( '-L', '--skip-lock', action='store_false', dest='lock', default=None, help="disable recording of updated versions", ) # List parser info = "display the current version of each dependency" sub = subs.add_parser( 'list', description=info.capitalize() + '.', help=info, parents=[debug, project, depth], formatter_class=common.WideHelpFormatter, ) sub.add_argument( '-D', '--fail-if-dirty', action='store_false', dest='allow_dirty', help="fail if a source has uncommitted changes", ) # Lock parser info = "lock the current version of each dependency" sub = subs.add_parser( 'lock', description=info.capitalize() + '.', help=info, parents=[debug, project], formatter_class=common.WideHelpFormatter, ) sub.add_argument('name', nargs='*', help="list of dependency names to lock") # Uninstall parser info = "delete all installed dependencies" sub = subs.add_parser( 'uninstall', description=info.capitalize() + '.', help=info, parents=[debug, project], formatter_class=common.WideHelpFormatter, ) sub.add_argument( '-f', '--force', action='store_true', help="delete uncommitted changes in dependencies", ) sub.add_argument( '-k', '--keep-location', dest='keep_location', default=False, action='store_true', help="keep top level folder location", ) # Show parser info = "display the path of a dependency or internal file" sub = subs.add_parser( 'show', description=info.capitalize() + '.', help=info, parents=[debug, project], formatter_class=common.WideHelpFormatter, ) sub.add_argument('name', nargs='*', help="display the path of this dependency") sub.add_argument( '-c', '--config', action='store_true', help="display the path of the config file", ) sub.add_argument( '-l', '--log', action='store_true', help="display the path of the log file" ) # Edit parser info = "open the config file in the default editor" sub = subs.add_parser( 'edit', description=info.capitalize() + '.', help=info, parents=[debug, project], formatter_class=common.WideHelpFormatter, ) # Parse arguments namespace = parser.parse_args(args=args) # Configure logging common.configure_logging(namespace.verbose) # Run the program function, args, kwargs = _get_command(function, namespace) if function: _run_command(function, args, kwargs) else: parser.print_help() sys.exit(1) def _get_command(function, namespace): # pylint: disable=too-many-statements args: List = [] kwargs: Dict = {} if namespace.command == 'init': function = commands.init elif namespace.command in ['install', 'update']: function = getattr(commands, namespace.command) args = namespace.name kwargs.update( root=namespace.root, depth=namespace.depth, force=namespace.force, clean=namespace.clean, skip_changes=namespace.skip_changes, ) if namespace.command == 'install': kwargs.update(fetch=namespace.fetch) if namespace.command == 'update': kwargs.update(recurse=namespace.recurse, lock=namespace.lock) elif namespace.command == 'list': function = commands.display kwargs.update( root=namespace.root, depth=namespace.depth, allow_dirty=namespace.allow_dirty, ) elif namespace.command == 'lock': function = getattr(commands, namespace.command) args = namespace.name kwargs.update(root=namespace.root) elif namespace.command == 'uninstall': function = commands.delete kwargs.update( root=namespace.root, force=namespace.force, keep_location=namespace.keep_location, ) elif namespace.command == 'show': function = commands.show args = namespace.name kwargs.update(root=namespace.root) if namespace.config: args.append('__config__') if namespace.log: args.append('__log__') elif namespace.command == 'edit': function = commands.edit kwargs.update(root=namespace.root) return function, args, kwargs def _run_command(function, args, kwargs): success = False exit_message = None try: log.debug("Running %s command...", getattr(function, '__name__', 'a')) success = function(*args, **kwargs) except KeyboardInterrupt: log.debug("Command canceled") except exceptions.UncommittedChanges as exception: _show_error(exception) exit_message = ( "Run again with '--force' to discard changes " "or '--skip-changes' to skip this dependency" ) except exceptions.ScriptFailure as exception: _show_error(exception) exit_message = "Run again with '--force' to ignore script errors" finally: if exit_message: common.show(exit_message, color='message') common.newline() if success: log.debug("Command succeeded") else: log.debug("Command failed") sys.exit(1) def _show_error(exception): # TODO: require level=, evaluate all calls to dedent() common.dedent(0) common.newline() common.show(str(exception), color='error') common.newline() if __name__ == '__main__': # pragma: no cover (manual test) main() PK!·$$gitman/commands.py"""Functions to manage the installation of dependencies.""" import datetime import functools import logging import os from . import common, system from .models import Config, Source, load_config log = logging.getLogger(__name__) def restore_cwd(func): @functools.wraps(func) def wrapped(*args, **kwargs): cwd = os.getcwd() result = func(*args, **kwargs) os.chdir(cwd) return result return wrapped def init(): """Create a new config file for the project.""" success = False config = load_config() if config: msg = "Configuration file already exists: {}".format(config.path) common.show(msg, color='error') else: config = Config() source = Source( 'git', name="sample_dependency", repo="https://github.com/githubtraining/hellogitworld", ) config.sources.append(source) source = source.lock(rev="ebbbf773431ba07510251bb03f9525c7bab2b13a") config.sources_locked.append(source) config.save() msg = "Created sample config file: {}".format(config.path) common.show(msg, color='success') success = True msg = "To edit this config file, run: gitman edit" common.show(msg, color='message') return success @restore_cwd def install( *names, root=None, depth=None, force=False, fetch=False, clean=True, skip_changes=False, ): """Install dependencies for a project. Optional arguments: - `*names`: optional list of dependency directory names to filter on - `root`: specifies the path to the root working tree - `depth`: number of levels of dependencies to traverse - `force`: indicates uncommitted changes can be overwritten and script errors can be ignored - `fetch`: indicates the latest branches should always be fetched - `clean`: indicates untracked files should be deleted from dependencies - `skip_changes`: indicates dependencies with uncommitted changes should be skipped """ log.info( "%sInstalling dependencies: %s", 'force-' if force else '', ', '.join(names) if names else '', ) count = None config = load_config(root) if config: common.newline() common.show("Installing dependencies...", color='message', log=False) common.newline() count = config.install_dependencies( *names, update=False, depth=depth, force=force, fetch=fetch, clean=clean, skip_changes=skip_changes, ) if count: _run_scripts(*names, depth=depth, force=force, _config=config) return _display_result("install", "Installed", count) @restore_cwd def update( *names, root=None, depth=None, recurse=False, force=False, clean=True, lock=None, # pylint: disable=redefined-outer-name skip_changes=False, ): """Update dependencies for a project. Optional arguments: - `*names`: optional list of dependency directory names to filter on - `root`: specifies the path to the root working tree - `depth`: number of levels of dependencies to traverse - `recurse`: indicates nested dependencies should also be updated - `force`: indicates uncommitted changes can be overwritten and script errors can be ignored - `clean`: indicates untracked files should be deleted from dependencies - `lock`: indicates updated dependency versions should be recorded - `skip_changes`: indicates dependencies with uncommitted changes should be skipped """ log.info( "%s dependencies%s: %s", 'Force updating' if force else 'Updating', ', recursively' if recurse else '', ', '.join(names) if names else '', ) count = None config = load_config(root) if config: common.newline() common.show("Updating dependencies...", color='message', log=False) common.newline() count = config.install_dependencies( *names, update=True, depth=depth, recurse=recurse, force=force, fetch=True, clean=clean, skip_changes=skip_changes, ) if count and lock is not False: common.show("Recording installed versions...", color='message', log=False) common.newline() config.lock_dependencies( *names, obey_existing=lock is None, skip_changes=skip_changes ) if count: _run_scripts(*names, depth=depth, force=force, _config=config) return _display_result("update", "Updated", count) def _run_scripts(*names, depth=None, force=False, _config=None): """Run post-install scripts. Optional arguments: - `*names`: optional list of dependency directory names filter on - `depth`: number of levels of dependencies to traverse - `force`: indicates script errors can be ignored """ assert _config, "'_config' is required" common.show("Running scripts...", color='message', log=False) common.newline() _config.run_scripts(*names, depth=depth, force=force) @restore_cwd def display(*, root=None, depth=None, allow_dirty=True): """Display installed dependencies for a project. Optional arguments: - `root`: specifies the path to the root working tree - `depth`: number of levels of dependencies to traverse - `allow_dirty`: causes uncommitted changes to be ignored """ log.info("Displaying dependencies...") count = None config = load_config(root) if config: common.newline() common.show( "Displaying current dependency versions...", color='message', log=False ) common.newline() config.log(datetime.datetime.now().strftime("%F %T")) count = 0 for identity in config.get_dependencies(depth=depth, allow_dirty=allow_dirty): count += 1 config.log("{}: {} @ {}", *identity) config.log() return _display_result("display", "Displayed", count) @restore_cwd def lock(*names, root=None): """Lock current dependency versions for a project. Optional arguments: - `*names`: optional list of dependency directory names to filter on - `root`: specifies the path to the root working tree """ log.info("Locking dependencies...") count = None config = load_config(root) if config: common.newline() common.show("Locking dependencies...", color='message', log=False) common.newline() count = config.lock_dependencies(*names, obey_existing=False) common.dedent(level=0) return _display_result("lock", "Locked", count) @restore_cwd def delete(*, root=None, force=False, keep_location=False): """Delete dependencies for a project. Optional arguments: - `root`: specifies the path to the root working tree - `force`: indicates uncommitted changes can be overwritten - `keep_location`: delete top level folder or keep the location """ log.info("Deleting dependencies...") count = None config = load_config(root) if config: common.newline() common.show("Checking for uncommitted changes...", color='message', log=False) common.newline() count = len(list(config.get_dependencies(allow_dirty=force))) common.dedent(level=0) common.show("Deleting all dependencies...", color='message', log=False) common.newline() if keep_location: config.clean_dependencies() else: config.uninstall_dependencies() return _display_result("delete", "Deleted", count, allow_zero=True) def show(*names, root=None): """Display the path of an installed dependency or internal file. - `name`: dependency name or internal file keyword - `root`: specifies the path to the root working tree """ log.info("Finding paths...") config = load_config(root) if not config: log.error("No config found") return False for name in names or [None]: common.show(config.get_path(name), color='path') return True def edit(*, root=None): """Open the confuration file for a project. Optional arguments: - `root`: specifies the path to the root working tree """ log.info("Launching config...") config = load_config(root) if not config: log.error("No config found") return False return system.launch(config.path) def _display_result(present, past, count, allow_zero=False): """Convert a command's dependency count to a return status. >>> _display_result("sample", "Sampled", 1) True >>> _display_result("sample", "Sampled", None) False >>> _display_result("sample", "Sampled", 0) False >>> _display_result("sample", "Sampled", 0, allow_zero=True) True """ if count is None: log.warning("No dependencies to %s", present) elif count == 1: log.info("%s 1 dependency", past) else: log.info("%s %s dependencies", past, count) if count: return True if count is None: return False assert count == 0 return allow_zero PK!gitman/common.py"""Common exceptions, classes, and functions.""" import argparse import logging import os import sys from . import settings _log = logging.getLogger(__name__) class WideHelpFormatter(argparse.HelpFormatter): """Command-line help text formatter with wider help text.""" def __init__(self, *args, **kwargs): kwargs['max_help_position'] = 40 super().__init__(*args, **kwargs) class WarningFormatter(logging.Formatter): """Logging formatter that displays verbose formatting for WARNING+.""" def __init__(self, default_format, verbose_format, *args, **kwargs): super().__init__(*args, **kwargs) self.default_format = default_format self.verbose_format = verbose_format def format(self, record): # pylint: disable=protected-access if record.levelno > logging.INFO: self._style._fmt = self.verbose_format else: self._style._fmt = self.default_format return super().format(record) def positive_int(value): value = int(value) if value < 1: raise TypeError return value class _Config: """Share logging options.""" MAX_VERBOSITY = 4 verbosity = 0 indent_level = 0 def configure_logging(count=0): """Configure logging using the provided verbosity count.""" if count == -1: level = settings.QUIET_LOGGING_LEVEL default_format = settings.DEFAULT_LOGGING_FORMAT verbose_format = settings.LEVELED_LOGGING_FORMAT elif count == 0: level = settings.DEFAULT_LOGGING_LEVEL default_format = settings.DEFAULT_LOGGING_FORMAT verbose_format = settings.LEVELED_LOGGING_FORMAT elif count == 1: level = settings.VERBOSE_LOGGING_LEVEL default_format = settings.VERBOSE_LOGGING_FORMAT verbose_format = settings.VERBOSE_LOGGING_FORMAT elif count == 2: level = settings.VERBOSE2_LOGGING_LEVEL default_format = settings.VERBOSE_LOGGING_FORMAT verbose_format = settings.VERBOSE_LOGGING_FORMAT elif count == 3: level = settings.VERBOSE2_LOGGING_LEVEL default_format = settings.VERBOSE2_LOGGING_FORMAT verbose_format = settings.VERBOSE2_LOGGING_FORMAT else: level = settings.VERBOSE2_LOGGING_LEVEL - 1 default_format = settings.VERBOSE2_LOGGING_FORMAT verbose_format = settings.VERBOSE2_LOGGING_FORMAT # Set a custom formatter logging.basicConfig(level=level) logging.captureWarnings(True) formatter = WarningFormatter( default_format, verbose_format, datefmt=settings.LOGGING_DATEFMT ) logging.root.handlers[0].setFormatter(formatter) logging.getLogger('yorm').setLevel(max(level, settings.YORM_LOGGING_LEVEL)) # Warn about excessive verbosity if count > _Config.MAX_VERBOSITY: msg = "Maximum verbosity level is {}".format(_Config.MAX_VERBOSITY) logging.warning(msg) _Config.verbosity = _Config.MAX_VERBOSITY else: _Config.verbosity = count def indent(): """Increase the indent of future output lines.""" _Config.indent_level += 1 def dedent(level=None): """Decrease (or reset) the indent of future output lines.""" if level is None: _Config.indent_level = max(0, _Config.indent_level - 1) else: _Config.indent_level = level def newline(): """Write a new line to standard output.""" show("") def show(*messages, file=sys.stdout, log=_log, **kwargs): """Write to standard output or error if enabled.""" if any(messages): assert 'color' in kwargs, "Color is required" color = kwargs.pop('color', None) for message in messages: if _Config.verbosity == 0: text = ' ' * 2 * _Config.indent_level + style(message, color) print(text, file=file) elif _Config.verbosity >= 1: message = message.strip() if message and log: if color == 'error': log.error(message) else: log.info(message) BOLD = '\033[1m' RED = '\033[31m' GREEN = '\033[32m' YELLOW = '\033[33m' BLUE = '\033[34m' MAGENTA = '\033[35m' CYAN = '\033[36m' WHITE = '\033[37m' RESET = '\033[0m' COLORS = dict( path='', git_rev=BOLD + BLUE, git_dirty=BOLD + MAGENTA, git_changes=YELLOW, shell=BOLD + GREEN, shell_info=BOLD + MAGENTA, shell_output=CYAN, shell_error=YELLOW, message=BOLD + WHITE, success=BOLD + GREEN, error=BOLD + RED, ) def style(msg, name=None, *, _color_support=False): is_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty() supports_ansi = sys.platform != 'win32' or 'ANSICON' in os.environ if not (is_tty and supports_ansi) and not _color_support: return msg if name == 'shell': return msg.replace("$ ", COLORS[name] + "$ " + RESET) color = COLORS.get(name) if color: return color + msg + RESET if msg: assert color is not None, "Unknown style name requested: {!r}".format(name) return msg PK!s(gitman/exceptions.py"""Shared exceptions.""" class InvalidConfig(ValueError): """Raised when the config file is invalid.""" class ShellError(RuntimeError): """Raised when a shell call has a non-zero return code.""" def __init__(self, *args, **kwargs): self.program = kwargs.pop('program', None) self.output = kwargs.pop('output', None) super().__init__(*args, **kwargs) # type: ignore class InvalidRepository(RuntimeError): """Raised when there is a problem with the checked out directory.""" class UncommittedChanges(RuntimeError): """Raised when uncommitted changes are not expected.""" class ScriptFailure(ShellError): """Raised when post-install script has a non-zero exit code.""" PK!2Y gitman/git.py"""Utilities to call Git commands.""" import logging import os import re import shutil from contextlib import suppress from . import common, settings from .exceptions import ShellError from .shell import call log = logging.getLogger(__name__) def git(*args, **kwargs): return call('git', *args, **kwargs) def gitsvn(*args, **kwargs): return call('git', 'svn', *args, **kwargs) def clone(type, repo, path, *, cache=settings.CACHE, sparse_paths=None, rev=None): """Clone a new Git repository.""" log.debug("Creating a new repository...") if type == 'git-svn': # just the preperation for the svn deep clone / checkout here # clone will be made in update function to simplify source.py). os.makedirs(path) return assert type == 'git' name = repo.split('/')[-1] if name.endswith(".git"): name = name[:-4] normpath = os.path.normpath(path) reference = os.path.join(cache, name + ".reference") sparse_paths_repo = repo if settings.CACHE_DISABLE else reference if not settings.CACHE_DISABLE and not os.path.isdir(reference): git('clone', '--mirror', repo, reference) if sparse_paths: os.mkdir(normpath) git('-C', normpath, 'init') git('-C', normpath, 'config', 'core.sparseCheckout', 'true') git('-C', normpath, 'remote', 'add', '-f', 'origin', sparse_paths_repo) with open( "%s/%s/.git/info/sparse-checkout" % (os.getcwd(), normpath), 'w' ) as fd: fd.writelines(sparse_paths) with open( "%s/%s/.git/objects/info/alternates" % (os.getcwd(), normpath), 'w' ) as fd: fd.write("%s/objects" % sparse_paths_repo) # We use directly the revision requested here in order to respect, # that not all repos have `master` as their default branch git('-C', normpath, 'pull', 'origin', rev) elif settings.CACHE_DISABLE: git('clone', repo, normpath) else: git('clone', '--reference', reference, repo, normpath) def is_sha(rev): """Heuristically determine whether a revision corresponds to a commit SHA. Any sequence of 7 to 40 hexadecimal digits will be recognized as a commit SHA. The minimum of 7 digits is not an arbitrary choice, it is the default length for short SHAs in Git. """ return re.match('^[0-9a-f]{7,40}$', rev) is not None def fetch(type, repo, path, rev=None): # pylint: disable=unused-argument """Fetch the latest changes from the remote repository.""" if type == 'git-svn': # deep clone happens in update function return assert type == 'git' git('remote', 'set-url', 'origin', repo) args = ['fetch', '--tags', '--force', '--prune', 'origin'] if rev: if is_sha(rev): pass # fetch only works with a SHA if already present locally elif '@' in rev: pass # fetch doesn't work with rev-parse else: args.append(rev) git(*args) def valid(): """Confirm the current directory is a valid working tree.""" log.debug("Checking for a valid working tree...") try: git('rev-parse', '--is-inside-work-tree', _show=False) except ShellError: return False else: return True def changes(type, include_untracked=False, display_status=True, _show=False): """Determine if there are changes in the working tree.""" status = False if type == 'git-svn': # ignore changes in case of git-svn return status assert type == 'git' try: # Refresh changes git('update-index', '-q', '--refresh', _show=False) # Check for uncommitted changes git('diff-index', '--quiet', 'HEAD', _show=_show) # Check for untracked files lines = git('ls-files', '--others', '--exclude-standard', _show=_show) except ShellError: status = True else: status = bool(lines) and include_untracked if status and display_status: with suppress(ShellError): lines = git('status', _show=True) common.show(*lines, color='git_changes') return status def update( type, repo, path, *, clean=True, fetch=False, rev=None ): # pylint: disable=redefined-outer-name,unused-argument if type == 'git-svn': # make deep clone here for simplification of sources.py # and to realize consistent readonly clone (always forced) # completly empty current directory (remove also hidden content) for root, dirs, files in os.walk('.'): for f in files: os.unlink(os.path.join(root, f)) for d in dirs: shutil.rmtree(os.path.join(root, d)) # clone specified svn revision gitsvn('clone', '-r', rev, repo, '.') return assert type == 'git' # Update the working tree to the specified revision. hide = {'_show': False, '_ignore': True} git('stash', **hide) if clean: git('clean', '--force', '-d', '-x', _show=False) rev = _get_sha_from_rev(rev) git('checkout', '--force', rev) git('branch', '--set-upstream-to', 'origin/' + rev, **hide) if fetch: # if `rev` was a branch it might be tracking something older git('pull', '--ff-only', '--no-rebase', **hide) def get_url(type): """Get the current repository's URL.""" if type == 'git-svn': return git('config', '--get', 'svn-remote.svn.url', _show=False)[0] assert type == 'git' return git('config', '--get', 'remote.origin.url', _show=False)[0] def get_hash(type, _show=False): """Get the current working tree's hash.""" if type == 'git-svn': return ''.join(filter(str.isdigit, gitsvn('info', _show=_show)[4])) assert type == 'git' return git('rev-parse', 'HEAD', _show=_show)[0] def get_tag(): """Get the current working tree's tag (if on a tag).""" return git('describe', '--tags', '--exact-match', _show=False, _ignore=True)[0] def is_fetch_required(type, rev): if type == 'git-svn': return False assert type == 'git' return rev not in (get_branch(), get_hash(type), get_tag()) def get_branch(): """Get the current working tree's branch.""" return git('rev-parse', '--abbrev-ref', 'HEAD', _show=False)[0] def _get_sha_from_rev(rev): """Get a rev-parse string's hash.""" if '@{' in rev: # TODO: use regex for this parts = rev.split('@') branch = parts[0] date = parts[1].strip("{}") git('checkout', '--force', branch, _show=False) rev = git( 'rev-list', '-n', '1', '--before={!r}'.format(date), '--first-parent', branch, _show=False, )[0] return rev PK!-QQgitman/models/__init__.pyfrom .source import Source # isort:skip from .config import Config, load_config PK!$''gitman/models/config.pyimport logging import os from typing import List import yorm from yorm.types import SortedList, String from .. import common, shell from .source import Source log = logging.getLogger(__name__) @yorm.attr(location=String) @yorm.attr(sources=SortedList.of_type(Source)) @yorm.attr(sources_locked=SortedList.of_type(Source)) @yorm.sync("{self.root}/{self.filename}", auto_save=False) class Config(yorm.ModelMixin): """Specifies all dependencies for a project.""" LOG = "gitman.log" def __init__(self, root=None, filename="gitman.yml", location="gitman_sources"): super().__init__() self.root = root or os.getcwd() self.filename = filename self.location = location self.sources: List[Source] = [] self.sources_locked: List[Source] = [] def _on_post_load(self): for source in self.sources: source._on_post_load() # pylint: disable=protected-access for source in self.sources_locked: source._on_post_load() # pylint: disable=protected-access @property def config_path(self): """Get the full path to the config file.""" return os.path.normpath(os.path.join(self.root, self.filename)) path = config_path @property def log_path(self): """Get the full path to the log file.""" return os.path.normpath(os.path.join(self.location_path, self.LOG)) @property def location_path(self): """Get the full path to the dependency storage location.""" return os.path.normpath(os.path.join(self.root, self.location)) def get_path(self, name=None): """Get the full path to a dependency or internal file.""" base = self.location_path if name == '__config__': return self.path if name == '__log__': return self.log_path if name: return os.path.normpath(os.path.join(base, name)) return base def install_dependencies( self, *names, depth=None, update=True, recurse=False, force=False, fetch=False, clean=True, skip_changes=False, ): """Download or update the specified dependencies.""" if depth == 0: log.info("Skipped directory: %s", self.location_path) return 0 sources = self._get_sources(use_locked=False if update else None) sources_filter = list(names) if names else [s.name for s in sources] if not os.path.isdir(self.location_path): shell.mkdir(self.location_path) shell.cd(self.location_path) common.newline() common.indent() count = 0 for source in sources: if source.name in sources_filter: sources_filter.remove(source.name) else: log.info("Skipped dependency: %s", source.name) continue source.update_files( force=force, fetch=fetch, clean=clean, skip_changes=skip_changes ) source.create_link(self.root, force=force) common.newline() count += 1 config = load_config(search=False) if config: common.indent() count += config.install_dependencies( depth=None if depth is None else max(0, depth - 1), update=update and recurse, recurse=recurse, force=force, fetch=fetch, clean=clean, skip_changes=skip_changes, ) common.dedent() shell.cd(self.location_path, _show=False) common.dedent() if sources_filter: log.error("No such dependency: %s", ' '.join(sources_filter)) return 0 return count def run_scripts(self, *names, depth=None, force=False): """Run scripts for the specified dependencies.""" if depth == 0: log.info("Skipped directory: %s", self.location_path) return 0 sources = self._get_sources() sources_filter = list(names) if names else [s.name for s in sources] shell.cd(self.location_path) common.newline() common.indent() count = 0 for source in sources: if source.name in sources_filter: source.run_scripts(force=force) count += 1 config = load_config(search=False) if config: common.indent() count += config.run_scripts( depth=None if depth is None else max(0, depth - 1), force=force ) common.dedent() shell.cd(self.location_path, _show=False) common.dedent() return count def lock_dependencies(self, *names, obey_existing=True, skip_changes=False): """Lock down the immediate dependency versions.""" sources = self._get_sources(use_locked=obey_existing).copy() sources_filter = list(names) if names else [s.name for s in sources] shell.cd(self.location_path) common.newline() common.indent() count = 0 for source in sources: if source.name not in sources_filter: log.info("Skipped dependency: %s", source.name) continue source_locked = source.lock(skip_changes=skip_changes) if source_locked is not None: try: index = self.sources_locked.index(source) except ValueError: self.sources_locked.append(source_locked) else: self.sources_locked[index] = source_locked count += 1 shell.cd(self.location_path, _show=False) if count: self.save() common.dedent() return count def uninstall_dependencies(self): """Delete the dependency storage location.""" shell.cd(self.root) shell.rm(self.location_path) common.newline() def clean_dependencies(self): """Delete the dependency storage location.""" for path in self.get_top_level_dependencies(): if path == self.location_path: log.info("Skipped dependency: %s", path) else: shell.rm(path) common.newline() shell.rm(self.log_path) def get_top_level_dependencies(self): """Yield the path, repository, and hash of top-level dependencies.""" if not os.path.exists(self.location_path): return shell.cd(self.location_path) common.newline() common.indent() for source in self.sources: yield os.path.join(self.location_path, source.name) shell.cd(self.location_path, _show=False) common.dedent() def get_dependencies(self, depth=None, allow_dirty=True): """Yield the path, repository, and hash of each dependency.""" if not os.path.exists(self.location_path): return shell.cd(self.location_path) common.newline() common.indent() for source in self.sources: if depth == 0: log.info("Skipped dependency: %s", source.name) continue yield source.identify(allow_dirty=allow_dirty) config = load_config(search=False) if config: common.indent() yield from config.get_dependencies( depth=None if depth is None else max(0, depth - 1), allow_dirty=allow_dirty, ) common.dedent() shell.cd(self.location_path, _show=False) common.dedent() def log(self, message="", *args): """Append a message to the log file.""" with open(self.log_path, 'a') as outfile: outfile.write(message.format(*args) + '\n') def _get_sources(self, *, use_locked=None): """Merge source lists using the requested section as the base.""" if use_locked is True: if self.sources_locked: return self.sources_locked log.info("No locked sources, defaulting to none...") return [] sources: List[Source] = [] if use_locked is False: sources = self.sources else: if self.sources_locked: log.info("Defaulting to locked sources...") sources = self.sources_locked else: log.info("No locked sources, using latest...") sources = self.sources extras = [] for source in self.sources + self.sources_locked: if source not in sources: log.info("Source %r missing from selected section", source.name) extras.append(source) return sources + extras def load_config(start=None, *, search=True): """Load the config for the current project.""" if start: start = os.path.abspath(start) else: start = os.getcwd() if search: log.debug("Searching for config...") path = start while path != os.path.dirname(path): log.debug("Looking for config in: %s", path) for filename in os.listdir(path): if _valid_filename(filename): config = Config(path, filename) config._on_post_load() # pylint: disable=protected-access log.debug("Found config: %s", config.path) return config if search: path = os.path.dirname(path) else: break if search: log.debug("No config found starting from: %s", start) else: log.debug("No config found in: %s", start) return None def _valid_filename(filename): name, ext = os.path.splitext(filename.lower()) if name.startswith('.'): name = name[1:] return name in ['gitman', 'gdm'] and ext in ['.yml', '.yaml'] PK!S7cgitman/models/source.pyimport logging import os import warnings import yorm from yorm.types import AttributeDictionary, List, NullableString, String from .. import common, exceptions, git, shell log = logging.getLogger(__name__) @yorm.attr(name=String) @yorm.attr(type=String) @yorm.attr(repo=String) @yorm.attr(sparse_paths=List.of_type(String)) @yorm.attr(rev=String) @yorm.attr(link=NullableString) @yorm.attr(scripts=List.of_type(String)) class Source(AttributeDictionary): """A dictionary of `git` and `ln` arguments.""" DIRTY = '' UNKNOWN = '' def __init__( self, type, repo, name=None, rev='master', link=None, scripts=None, sparse_paths=None, ): super().__init__() self.type = type or 'git' self.repo = repo self.name = self._infer_name(repo) if name is None else name self.rev = rev self.link = link self.scripts = scripts or [] self.sparse_paths = sparse_paths or [] for key in ['name', 'repo', 'rev']: if not self[key]: msg = "'{}' required for {}".format(key, repr(self)) raise exceptions.InvalidConfig(msg) def _on_post_load(self): self.type = self.type or 'git' def __repr__(self): return "".format(self) def __str__(self): pattern = "['{t}'] '{r}' @ '{v}' in '{d}'" if self.link: pattern += " <- '{s}'" return pattern.format( t=self.type, r=self.repo, v=self.rev, d=self.name, s=self.link ) def __eq__(self, other): return self.name == other.name def __ne__(self, other): return self.name != other.name def __lt__(self, other): return self.name < other.name def update_files(self, force=False, fetch=False, clean=True, skip_changes=False): """Ensure the source matches the specified revision.""" log.info("Updating source files...") # Clone the repository if needed if not os.path.exists(self.name): git.clone( self.type, self.repo, self.name, sparse_paths=self.sparse_paths, rev=self.rev, ) # Enter the working tree shell.cd(self.name) if not git.valid(): raise self._invalid_repository # Check for uncommitted changes if not force: log.debug("Confirming there are no uncommitted changes...") if skip_changes: if git.changes( self.type, include_untracked=clean, display_status=False ): common.show( f'Skipped update due to uncommitted changes in {os.getcwd()}', color='git_changes', ) return else: if git.changes(self.type, include_untracked=clean): raise exceptions.UncommittedChanges( f'Uncommitted changes in {os.getcwd()}' ) # Fetch the desired revision if fetch or git.is_fetch_required(self.type, self.rev): git.fetch(self.type, self.repo, self.name, rev=self.rev) # Update the working tree to the desired revision git.update( self.type, self.repo, self.name, fetch=fetch, clean=clean, rev=self.rev ) def create_link(self, root, force=False): """Create a link from the target name to the current directory.""" if not self.link: return log.info("Creating a symbolic link...") if os.name == 'nt': warnings.warn("Symbolic links are not supported on Windows") return target = os.path.join(root, self.link) source = os.path.relpath(os.getcwd(), os.path.dirname(target)) if os.path.islink(target): os.remove(target) elif os.path.exists(target): if force: shell.rm(target) else: msg = "Preexisting link location at {}".format(target) raise exceptions.UncommittedChanges(msg) shell.ln(source, target) def run_scripts(self, force=False): log.info("Running install scripts...") # Enter the working tree shell.cd(self.name) if not git.valid(): raise self._invalid_repository # Check for scripts if not self.scripts: common.show("(no scripts to run)", color='shell_info') common.newline() return # Run all scripts for script in self.scripts: try: lines = shell.call(script, _shell=True) except exceptions.ShellError as exc: common.show(*exc.output, color='shell_error') cmd = exc.program if force: log.debug("Ignored error from call to '%s'", cmd) else: msg = "Command '{}' failed in {}".format(cmd, os.getcwd()) raise exceptions.ScriptFailure(msg) else: common.show(*lines, color='shell_output') common.newline() def identify(self, allow_dirty=True, allow_missing=True, skip_changes=False): """Get the path and current repository URL and hash.""" if os.path.isdir(self.name): shell.cd(self.name) if not git.valid(): raise self._invalid_repository path = os.getcwd() url = git.get_url(self.type) if git.changes( self.type, display_status=not allow_dirty and not skip_changes, _show=not skip_changes, ): if allow_dirty: common.show(self.DIRTY, color='git_dirty', log=False) common.newline() return path, url, self.DIRTY if skip_changes: msg = ("Skipped lock due to uncommitted changes " "in {}").format( os.getcwd() ) common.show(msg, color='git_changes') common.newline() return path, url, self.DIRTY msg = "Uncommitted changes in {}".format(os.getcwd()) raise exceptions.UncommittedChanges(msg) rev = git.get_hash(self.type, _show=True) common.show(rev, color='git_rev', log=False) common.newline() return path, url, rev if allow_missing: return os.getcwd(), '', self.UNKNOWN raise self._invalid_repository def lock(self, rev=None, allow_dirty=False, skip_changes=False): """Create a locked source object. Return a locked version of the current source if not dirty otherwise None. """ if rev is None: _, _, rev = self.identify( allow_dirty=allow_dirty, allow_missing=False, skip_changes=skip_changes ) if rev == self.DIRTY: return None source = self.__class__( self.type, self.repo, self.name, rev, self.link, self.scripts, self.sparse_paths, ) return source @property def _invalid_repository(self): path = os.path.join(os.getcwd(), self.name) msg = "Not a valid repository: {}".format(path) return exceptions.InvalidRepository(msg) @staticmethod def _infer_name(repo): filename = repo.split('/')[-1] name = filename.split('.')[0] return name PK!C" " gitman/plugin.py#!/usr/bin/env python3 """Plugin for Git.""" import argparse import logging from . import __version__, common from .cli import _get_command, _run_command PROG = 'git deps' DESCRIPTION = "Use GitMan (v{}) to install repositories.".format(__version__) log = logging.getLogger(__name__) def main(args=None): """Process command-line arguments and run the Git plugin.""" # Main parser parser = argparse.ArgumentParser(prog=PROG, description=DESCRIPTION) parser.add_argument( '-f', '--force', action='store_true', help="overwrite uncommitted changes in dependencies", ) parser.add_argument( '-s', '--skip-changes', action='store_true', dest='skip_changes', help="skip dependencies with uncommitted changes", ) parser.add_argument( '-c', '--clean', action='store_true', help="delete ignored files when updating dependencies", ) # Options group group = parser.add_mutually_exclusive_group() # Update option group.add_argument( '-u', '--update', const='update', help="update dependencies to the latest versions", action='store_const', dest='command', ) parser.add_argument( '-a', '--all', action='store_true', dest='recurse', help="include nested dependencies when updating", ) parser.add_argument( '-L', '--skip-lock', action='store_false', dest='lock', default=True, help="disable recording of updated versions", ) # Display option group.add_argument( '-l', '--list', const='list', help="display the current version of each dependency", action='store_const', dest='command', ) # Uninstall option group.add_argument( '-x', '--uninstall', const='uninstall', help="delete all installed dependencies", action='store_const', dest='command', ) parser.add_argument( '-k', '--keep-location', action='store_true', dest='keep_location', default=False, help='keep top level folder location', ) # Parse arguments namespace = parser.parse_args(args=args) # Modify arguments to match CLI interface if not namespace.command: namespace.command = 'install' namespace.name = [] namespace.root = None namespace.depth = None namespace.allow_dirty = True namespace.fetch = True # Configure logging common.configure_logging() # Run the program function, args, kwargs = _get_command(None, namespace) _run_command(function, args, kwargs) if __name__ == '__main__': # pragma: no cover (manual test) main() PK! gitman/settings.py"""Program defaults.""" import logging import os # Cache settings CACHE = os.path.expanduser(os.getenv('GITMAN_CACHE', "~/.gitcache")) CACHE_DISABLE = bool(os.getenv('GITMAN_CACHE_DISABLE')) # Logging settings DEFAULT_LOGGING_FORMAT = "%(message)s" LEVELED_LOGGING_FORMAT = "%(levelname)s: %(message)s" VERBOSE_LOGGING_FORMAT = "[%(levelname)-8s] %(message)s" VERBOSE2_LOGGING_FORMAT = "[%(levelname)-8s] (%(name)s @%(lineno)4d) %(message)s" QUIET_LOGGING_LEVEL = logging.ERROR DEFAULT_LOGGING_LEVEL = logging.WARNING VERBOSE_LOGGING_LEVEL = logging.INFO VERBOSE2_LOGGING_LEVEL = logging.DEBUG LOGGING_DATEFMT = "%Y-%m-%d %H:%M" # 3rd party settings YORM_LOGGING_LEVEL = logging.WARNING SH_LOGGING_LEVEL = logging.WARNING PK!/*Մ gitman/shell.py"""Utilities to call shell programs.""" import logging import os import subprocess from . import common from .exceptions import ShellError CMD_PREFIX = "$ " OUT_PREFIX = "> " log = logging.getLogger(__name__) def call(name, *args, _show=True, _shell=False, _ignore=False): """Call a program with arguments. :param name: name of program to call :param args: list of command-line arguments :param _show: display the call on stdout :param _shell: force executing the program into a real shell a Windows shell command (i.e: dir, echo) needs a real shell but not a regular program (i.e: calc, git) :param _ignore: ignore non-zero return codes """ program = show(name, *args, stdout=_show) command = subprocess.run( name if _shell else [name, *args], universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=_shell, ) output = [line.strip() for line in command.stdout.splitlines()] for line in output: log.debug(OUT_PREFIX + line) if command.returncode == 0: return output if _ignore: log.debug("Ignored error from call to '%s'", name) return output message = ( "An external program call failed." + "\n\n" "In working directory: " + os.getcwd() + "\n\n" "The following command produced a non-zero return code:" + "\n\n" + CMD_PREFIX + program + "\n" + command.stdout.strip() ) raise ShellError(message, program=program, output=output) def mkdir(path): if not os.path.exists(path): if os.name == 'nt': call("mkdir " + path, _shell=True) else: call('mkdir', '-p', path) def cd(path, _show=True): if os.name == 'nt': show('cd', '/D', path, stdout=_show) else: show('cd', path, stdout=_show) os.chdir(path) def ln(source, target): if os.name == 'nt': log.warning("Symlinks are not supported on Windows") else: dirpath = os.path.dirname(target) if not os.path.isdir(dirpath): mkdir(dirpath) call('ln', '-s', source, target) def rm(path): if os.name == 'nt': if os.path.isfile(path): call("del /Q /F " + path, _shell=True) elif os.path.isdir(path): call("rmdir /Q /S " + path, _shell=True) else: call('rm', '-rf', path) def show(name, *args, stdout=True): program = ' '.join([name, *args]) if stdout: common.show(CMD_PREFIX + program, color='shell') else: log.debug(CMD_PREFIX + program) return program PK!?lgitman/system.py"""Interface to the operating system.""" import logging import os import platform import subprocess log = logging.getLogger(__name__) def launch(path): """Open a file with its default program.""" name = platform.system() log.info("Opening %s", path) try: function = { 'Windows': _launch_windows, 'Darwin': _launch_mac, 'Linux': _launch_linux, }[name] except KeyError: raise RuntimeError("Unrecognized platform: {}".format(name)) from None else: return function(path) def _launch_windows(path): # pragma: no cover (manual test) # pylint: disable=no-member os.startfile(path) # type: ignore return True def _launch_mac(path): # pragma: no cover (manual test) return subprocess.call(['open', path]) == 0 def _launch_linux(path): # pragma: no cover (manual test) return subprocess.call(['xdg-open', path]) == 0 PK!ޙPb""gitman/tests/__init__.py"""Unit tests for the package.""" PK!κgitman/tests/conftest.py"""Unit test configuration file.""" import logging import os import pytest import yorm ENV = 'TEST_INTEGRATION' # environment variable to enable integration tests REASON = "'{0}' variable not set".format(ENV) ROOT = os.path.dirname(__file__) FILES = os.path.join(ROOT, 'files') def pytest_configure(config): """Conigure logging and silence verbose test runner output.""" logging.basicConfig( level=logging.DEBUG, format="[%(levelname)-8s] (%(name)s @%(lineno)4d) %(message)s", ) logging.getLogger('yorm').setLevel(logging.WARNING) terminal = config.pluginmanager.getplugin('terminal') class QuietReporter(terminal.TerminalReporter): # type: ignore def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.verbosity = 0 self.showlongtestinfo = False self.showfspath = False terminal.TerminalReporter = QuietReporter def pytest_runtest_setup(item): """Disable YORM file storage during unit tests.""" if 'integration' in item.keywords: if not os.getenv(ENV): pytest.skip(REASON) else: yorm.settings.fake = False else: yorm.settings.fake = True PK!=Ȳa  !gitman/tests/files/gdm-custom.ymllocation: dependencies sources: - repo: https://github.com/jacebrowning/gitman type: git name: gitman_1 rev: fb693447579235391a45ca170959b5583c5042d8 link: src/gitman_a - repo: https://github.com/jacebrowning/gitman type: git name: gitman_2 rev: master link: src/gitman_b PK!.?3H"gitman/tests/files/gdm-default.ymllocation: gitman_modules sources: - repo: https://github.com/jacebrowning/gitman type: git name: gitman_1 rev: fb693447579235391a45ca170959b5583c5042d8 PK!wA55gitman/tests/files/gitman.ymllocation: ../../../tmp sources: - name: gitman_1 type: git link: '' repo: https://github.com/jacebrowning/gitman-demo rev: example-branch - name: gitman_2 type: git link: '' repo: https://github.com/jacebrowning/gitman-demo rev: example-tag - name: gitman_3 type: git link: '' repo: https://github.com/jacebrowning/gitman-demo rev: master@{2015-06-18 11:11:11} sources_locked: - name: gitman_1 type: git link: '' repo: https://github.com/jacebrowning/gitman-demo rev: eb37743011a398b208dd9f9ef79a408c0fc10d48 - name: gitman_2 type: git link: '' repo: https://github.com/jacebrowning/gitman-demo rev: 7bd138fe7359561a8c2ff9d195dff238794ccc04 - name: gitman_3 type: git link: '' repo: https://github.com/jacebrowning/gitman-demo rev: 2da24fca34af3748e3cab61db81a2ae8b35aec94 PK!hVw/w/gitman/tests/test_cli.py# pylint: disable=no-self-use,unused-variable,expression-not-assigned import logging from unittest.mock import Mock, patch import pytest from expecter import expect from gitman import cli from gitman.common import _Config from gitman.exceptions import ScriptFailure, UncommittedChanges class TestMain: """Unit tests for the top-level arguments.""" def test_main(self): """Verify the top-level command can be run.""" mock_function = Mock(return_value=True) cli.main([], mock_function) mock_function.assert_called_once_with() def test_main_fail(self): """Verify error in commands are detected.""" with pytest.raises(SystemExit): cli.main([], Mock(return_value=False)) def test_main_help(self): """Verify the help text can be displayed.""" with pytest.raises(SystemExit): cli.main(['--help']) def test_main_none(self): """Verify it's an error to specify no command.""" with pytest.raises(SystemExit): cli.main([]) def test_main_interrupt(self): """Verify a command can be interrupted.""" with pytest.raises(SystemExit): cli.main([], Mock(side_effect=KeyboardInterrupt)) def test_main_error(self): """Verify runtime errors are handled.""" with pytest.raises(SystemExit): cli.main([], Mock(side_effect=UncommittedChanges)) with pytest.raises(SystemExit): cli.main([], Mock(side_effect=ScriptFailure)) class TestInit: """Unit tests for the `init` command.""" @patch('gitman.commands.init') def test_install(self, mock_init): """Verify the 'install' command can be run.""" cli.main(['init']) mock_init.assert_called_once_with() class TestInstall: """Unit tests for the `install` command.""" @patch('gitman.commands.install') def test_install(self, mock_install): """Verify the 'install' command can be run.""" cli.main(['install']) mock_install.assert_called_once_with( root=None, depth=5, force=False, fetch=False, clean=False, skip_changes=False, ) @patch('gitman.commands.install') def test_install_root(self, mock_install): """Verify the project's root can be specified.""" cli.main(['install', '--root', 'mock/path/to/root']) mock_install.assert_called_once_with( root='mock/path/to/root', depth=5, force=False, fetch=False, clean=False, skip_changes=False, ) @patch('gitman.commands.install') def test_install_force(self, mock_install): """Verify dependencies can be force-installed.""" cli.main(['install', '--force']) mock_install.assert_called_once_with( root=None, depth=5, force=True, fetch=False, clean=False, skip_changes=False ) @patch('gitman.commands.install') def test_install_fetch(self, mock_install): """Verify fetching can be enabled.""" cli.main(['install', '--fetch']) mock_install.assert_called_once_with( root=None, depth=5, force=False, fetch=True, clean=False, skip_changes=False ) @patch('gitman.commands.install') def test_install_clean(self, mock_install): """Verify dependency cleaning can be enabled.""" cli.main(['install', '--clean']) mock_install.assert_called_once_with( root=None, depth=5, force=False, fetch=False, clean=True, skip_changes=False ) @patch('gitman.commands.install') def test_install_specific_sources(self, mock_install): """Verify individual dependencies can be installed.""" cli.main(['install', 'foo', 'bar']) mock_install.assert_called_once_with( 'foo', 'bar', root=None, depth=5, force=False, fetch=False, clean=False, skip_changes=False, ) @patch('gitman.commands.install') def test_install_with_depth(self, mock_update): """Verify the 'install' command can be limited by depth.""" cli.main(['install', '--depth', '10']) mock_update.assert_called_once_with( root=None, depth=10, force=False, fetch=False, clean=False, skip_changes=False, ) @patch('gitman.commands.install', Mock()) def test_install_with_depth_invalid(self): """Verify depths below 1 are rejected.""" with pytest.raises(SystemExit): cli.main(['install', '--depth', '0']) with pytest.raises(SystemExit): cli.main(['install', '--depth', '-1']) class TestUpdate: """Unit tests for the `update` command.""" @patch('gitman.commands.update') def test_update(self, mock_update): """Verify the 'update' command can be run.""" cli.main(['update']) mock_update.assert_called_once_with( root=None, depth=5, force=False, clean=False, recurse=False, lock=None, skip_changes=False, ) @patch('gitman.commands.update') def test_update_recursive(self, mock_update): """Verify the 'update' command can be run recursively.""" cli.main(['update', '--all']) mock_update.assert_called_once_with( root=None, depth=5, force=False, clean=False, recurse=True, lock=None, skip_changes=False, ) @patch('gitman.commands.update') def test_update_no_lock(self, mock_update): """Verify the 'update' command can disable locking.""" cli.main(['update', '--skip-lock']) mock_update.assert_called_once_with( root=None, depth=5, force=False, clean=False, recurse=False, lock=False, skip_changes=False, ) @patch('gitman.commands.update') def test_update_skip_changes(self, mock_update): """Verify the 'update' command with skip changes option.""" cli.main(['update', '--skip-changes']) mock_update.assert_called_once_with( root=None, depth=5, force=False, clean=False, recurse=False, lock=None, skip_changes=True, ) @patch('gitman.commands.update') def test_update_specific_sources(self, mock_install): """Verify individual dependencies can be installed.""" cli.main(['update', 'foo', 'bar']) mock_install.assert_called_once_with( 'foo', 'bar', root=None, depth=5, force=False, clean=False, recurse=False, lock=None, skip_changes=False, ) @patch('gitman.commands.update') def test_update_with_depth(self, mock_update): """Verify the 'update' command can be limited by depth.""" cli.main(['update', '--depth', '10']) mock_update.assert_called_once_with( root=None, depth=10, force=False, clean=False, recurse=False, lock=None, skip_changes=False, ) class TestList: """Unit tests for the `list` command.""" @patch('gitman.commands.display') def test_list(self, mock_display): """Verify the 'list' command can be run.""" cli.main(['list']) mock_display.assert_called_once_with(root=None, depth=5, allow_dirty=True) @patch('gitman.commands.display') def test_list_root(self, mock_display): """Verify the project's root can be specified.""" cli.main(['list', '--root', 'mock/path/to/root']) mock_display.assert_called_once_with( root='mock/path/to/root', depth=5, allow_dirty=True ) @patch('gitman.commands.display') def test_list_no_dirty(self, mock_display): """Verify the 'list' command can be set to fail when dirty.""" cli.main(['list', '--fail-if-dirty']) mock_display.assert_called_once_with(root=None, depth=5, allow_dirty=False) @patch('gitman.commands.display') def test_update_with_depth(self, mock_update): """Verify the 'list' command can be limited by depth.""" cli.main(['list', '--depth', '10']) mock_update.assert_called_once_with(root=None, depth=10, allow_dirty=True) def describe_lock(): @patch('gitman.commands.lock') def with_no_arguments(lock): cli.main(['lock']) lock.assert_called_once_with(root=None) @patch('gitman.commands.lock') def with_dependencies(lock): cli.main(['lock', 'foo', 'bar']) lock.assert_called_once_with('foo', 'bar', root=None) class TestUninstall: """Unit tests for the `uninstall` command.""" @patch('gitman.commands.delete') def test_uninstall(self, mock_uninstall): """Verify the 'uninstall' command can be run.""" cli.main(['uninstall']) mock_uninstall.assert_called_once_with( root=None, force=False, keep_location=False ) @patch('gitman.commands.delete') def test_uninstall_root(self, mock_uninstall): """Verify the project's root can be specified.""" cli.main(['uninstall', '--root', 'mock/path/to/root']) mock_uninstall.assert_called_once_with( root='mock/path/to/root', force=False, keep_location=False ) @patch('gitman.commands.delete') def test_uninstall_force(self, mock_uninstall): """Verify the 'uninstall' command can be forced.""" cli.main(['uninstall', '--force']) mock_uninstall.assert_called_once_with( root=None, force=True, keep_location=False ) @patch('gitman.commands.delete') def test_uninstall_keep_location(self, mock_uninstall): """Verify the 'uninstall' command can be run with keep_location.""" cli.main(['uninstall', '--keep-location']) mock_uninstall.assert_called_once_with( root=None, force=False, keep_location=True ) def describe_show(): @patch('gitman.commands.show') def with_no_arguments(show): cli.main(['show']) show.assert_called_once_with(root=None) @patch('gitman.commands.show') def with_root(show): cli.main(['show', '--root', "mock/root"]) show.assert_called_once_with(root="mock/root") @patch('gitman.commands.show') def with_names(show): cli.main(['show', 'foo', 'bar']) show.assert_called_once_with('foo', 'bar', root=None) @patch('gitman.commands.show') def with_config(show): cli.main(['show', '--config']) show.assert_called_once_with('__config__', root=None) @patch('gitman.commands.show') def with_log(show): cli.main(['show', '--log']) show.assert_called_once_with('__log__', root=None) def describe_edit(): @patch('gitman.commands.edit') def with_no_arguments(edit): cli.main(['edit']) edit.assert_called_once_with(root=None) @patch('gitman.commands.edit') def with_root(edit): cli.main(['edit', '--root', "mock/root"]) edit.assert_called_once_with(root="mock/root") def describe_logging(): argument_verbosity = [ (None, 0), ('-v', 1), ('-vv', 2), ('-vvv', 3), ('-vvvv', 4), ('-vvvvv', 4), ('-q', -1), ] @pytest.mark.parametrize("argument,verbosity", argument_verbosity) def at_each_level(argument, verbosity): def function(*args, **kwargs): logging.debug(args) logging.debug(kwargs) logging.warning("warning") logging.error("error") return True cli.main([argument] if argument else [], function) expect(_Config.verbosity) == verbosity PK!^^gitman/tests/test_commands.py# pylint: disable=redefined-outer-name,unused-argument,unused-variable,singleton-comparison,expression-not-assigned from expecter import expect from gitman import commands def describe_install(): def can_be_run_without_project(tmpdir): tmpdir.chdir() expect(commands.install()) == False def describe_update(): def can_be_run_without_project(tmpdir): tmpdir.chdir() expect(commands.update()) == False def describe_display(): def can_be_run_without_project(tmpdir): tmpdir.chdir() expect(commands.display()) == False def describe_lock(): def can_be_run_without_project(tmpdir): tmpdir.chdir() expect(commands.lock()) == False def describe_delete(): def can_be_run_without_project(tmpdir): tmpdir.chdir() expect(commands.delete()) == False def describe_show(): def can_be_run_without_project(tmpdir): tmpdir.chdir() expect(commands.show()) == False def describe_edit(): def can_be_run_without_project(tmpdir): tmpdir.chdir() expect(commands.show()) == False PK!Wo gitman/tests/test_common.py# pylint: disable=attribute-defined-outside-init # pylint: disable=unused-variable,expression-not-assigned from unittest.mock import Mock, call from expecter import expect from gitman import common from gitman.common import _Config class TestShowConsole: def setup_method(self, _): _Config.indent_level = 0 _Config.verbosity = 0 self.file = Mock() def test_show(self): common.show("Hello, world!", file=self.file, color=None) assert [call.write("Hello, world!"), call.write("\n")] == self.file.mock_calls def test_show_after_indent(self): common.indent() common.show("|\n", file=self.file, color=None) assert [call.write(" |\n"), call.write("\n")] == self.file.mock_calls def test_show_after_1_indent_2_dedent(self): common.indent() common.dedent() common.dedent() common.show("|\n", file=self.file, color=None) assert [call.write("|\n"), call.write("\n")] == self.file.mock_calls class TestShowLog: def setup_method(self, _): _Config.indent_level = 0 _Config.verbosity = 1 self.log = Mock() def test_show(self): common.show("Hello, world!", log=self.log, color=None) assert [call.info("Hello, world!")] == self.log.mock_calls def test_show_errors(self): common.show("Oops", color='error', log=self.log) expect(self.log.mock_calls) == [call.error("Oops")] def test_show_after_indent(self): common.indent() common.show("|\n", log=self.log, color=None) assert [call.info("|")] == self.log.mock_calls def test_show_after_1_indent_2_dedent(self): common.indent() common.dedent() common.dedent() common.show("|\n", log=self.log, color=None) assert [call.info("|")] == self.log.mock_calls class TestShowQuiet: def setup_method(self, _): _Config.indent_level = 0 _Config.verbosity = -1 self.file = Mock() self.log = Mock() def test_show(self): common.show("Hello, world!", file=self.file, log=self.log, color=None) assert [] == self.file.mock_calls assert [] == self.log.mock_calls def describe_show(): def it_requries_color_with_messages(): with expect.raises(AssertionError): common.show("Hello, world!", 'foobar') def describe_style(): def when_no_color_support(): msg = common.style("_foo_") expect(msg) == "_foo_" def when_no_message(): msg = common.style("", _color_support=True) expect(msg) == "" def when_shell(): msg = common.style("$ _foo_", 'shell', _color_support=True) expect(msg) == "\x1b[1m\x1b[32m$ \x1b[0m_foo_" def when_color(): msg = common.style("_foo_", 'message', _color_support=True) expect(msg) == "\x1b[1m\x1b[37m_foo_\x1b[0m" def when_unknown_color(): with expect.raises(AssertionError): common.style("_foo_", 'bar', _color_support=True) PK!gitman/tests/test_git.py# pylint: disable=no-self-use import os from unittest.mock import Mock, patch from gitman import git, settings from gitman.exceptions import ShellError from .utils import check_calls @patch('gitman.git.call') class TestGit: """Tests for calls to Git.""" @patch('os.path.isdir', Mock(return_value=False)) def test_clone(self, mock_call): """Verify the commands to set up a new reference repository.""" git.clone('git', 'mock.git', 'mock/path', cache='cache') check_calls( mock_call, [ "git clone --mirror mock.git " + os.path.normpath("cache/mock.reference"), "git clone --reference " + os.path.normpath("cache/mock.reference") + " mock.git " + os.path.normpath("mock/path"), ], ) @patch('os.path.isdir', Mock(return_value=False)) def test_clone_without_cache(self, mock_call): """Verify the commands to clone a repository.""" settings.CACHE_DISABLE = True try: git.clone('git', 'mock.git', 'mock/path', cache='cache') check_calls( mock_call, ["git clone mock.git " + os.path.normpath("mock/path")] ) finally: settings.CACHE_DISABLE = False @patch('os.path.isdir', Mock(return_value=True)) def test_clone_from_reference(self, mock_call): """Verify the commands to clone a Git repository from a reference.""" git.clone('git', 'mock.git', 'mock/path', cache='cache') check_calls( mock_call, [ "git clone --reference " + os.path.normpath("cache/mock.reference") + " mock.git " + os.path.normpath("mock/path") ], ) def test_fetch(self, mock_call): """Verify the commands to fetch from a Git repository.""" git.fetch('git', 'mock.git', 'mock/path') check_calls( mock_call, [ "git remote set-url origin mock.git", "git fetch --tags --force --prune origin", ], ) def test_fetch_rev(self, mock_call): """Verify the commands to fetch from a Git repository w/ rev.""" git.fetch('git', 'mock.git', 'mock/path', 'mock-rev') check_calls( mock_call, [ "git remote set-url origin mock.git", "git fetch --tags --force --prune origin mock-rev", ], ) def test_fetch_rev_sha(self, mock_call): """Verify the commands to fetch from a Git repository w/ SHA.""" git.fetch('git', 'mock.git', 'mock/path', 'abcdef1234' * 4) check_calls( mock_call, [ "git remote set-url origin mock.git", "git fetch --tags --force --prune origin", ], ) def test_fetch_rev_revparse(self, mock_call): """Verify the commands to fetch from a Git repository w/ rev-parse.""" git.fetch('git', 'mock.git', 'mock/path', 'master@{2015-02-12 18:30:00}') check_calls( mock_call, [ "git remote set-url origin mock.git", "git fetch --tags --force --prune origin", ], ) def test_valid(self, mock_call): """Verify the commands to check for a working tree.""" git.valid() check_calls(mock_call, ["git rev-parse --is-inside-work-tree"]) def test_changes(self, mock_call): """Verify the commands to check for uncommitted changes.""" git.changes('git', include_untracked=True) check_calls( mock_call, [ # based on: http://stackoverflow.com/questions/3878624 "git update-index -q --refresh", "git diff-index --quiet HEAD", "git ls-files --others --exclude-standard", "git status", # used for displaying the overall status ], ) def test_changes_false(self, _): """Verify the absence of changes can be detected.""" with patch('gitman.git.call', Mock(return_value=[""])): assert False is git.changes('git') def test_changes_false_with_untracked(self, _): """Verify untracked files can be detected.""" with patch('gitman.git.call', Mock(return_value=["file_1"])): assert False is git.changes('git') def test_changes_true_when_untracked_included(self, _): """Verify untracked files can be detected.""" with patch('gitman.git.call', Mock(return_value=["file_1"])): assert True is git.changes('git', include_untracked=True) def test_changes_true_when_uncommitted(self, _): """Verify uncommitted changes can be detected.""" with patch('gitman.git.call', Mock(side_effect=ShellError)): assert True is git.changes('git', display_status=False) def test_update(self, mock_call): """Verify the commands to update a working tree to a revision.""" git.update('git', 'mock.git', 'mock/path', rev='mock_rev') check_calls( mock_call, [ "git stash", "git clean --force -d -x", "git checkout --force mock_rev", "git branch --set-upstream-to origin/mock_rev", ], ) def test_update_branch(self, mock_call): """Verify the commands to update a working tree to a branch.""" git.update('git', 'mock.git', 'mock/path', fetch=True, rev='mock_branch') check_calls( mock_call, [ "git stash", "git clean --force -d -x", "git checkout --force mock_branch", "git branch --set-upstream-to origin/mock_branch", "git pull --ff-only --no-rebase", ], ) def test_update_no_clean(self, mock_call): git.update('git', 'mock.git', 'mock/path', clean=False, rev='mock_rev') check_calls( mock_call, [ "git stash", "git checkout --force mock_rev", "git branch --set-upstream-to origin/mock_rev", ], ) def test_update_revparse(self, mock_call): """Verify the commands to update a working tree to a rev-parse.""" mock_call.return_value = ["abc123"] git.update( 'git', 'mock.git', 'mock/path', rev='mock_branch@{2015-02-12 18:30:00}' ) check_calls( mock_call, [ "git stash", "git clean --force -d -x", "git checkout --force mock_branch", ( "git rev-list -n 1 --before='2015-02-12 18:30:00' " "--first-parent mock_branch" ), "git checkout --force abc123", "git branch --set-upstream-to origin/abc123", ], ) def test_get_url(self, mock_call): """Verify the commands to get the current repository's URL.""" git.get_url('git') check_calls(mock_call, ["git config --get remote.origin.url"]) def test_get_hash(self, mock_call): """Verify the commands to get the working tree's hash.""" git.get_hash('git') check_calls(mock_call, ["git rev-parse HEAD"]) def test_get_tag(self, mock_call): """Verify the commands to get the working tree's tag.""" git.get_tag() check_calls(mock_call, ["git describe --tags --exact-match"]) def test_get_branch(self, mock_call): """Verify the commands to get the working tree's branch.""" git.get_branch() check_calls(mock_call, ["git rev-parse --abbrev-ref HEAD"]) PK!= !"gitman/tests/test_models_config.py# pylint: disable=no-self-use,redefined-outer-name,unused-variable,expression-not-assigned,misplaced-comparison-constant,len-as-condition import os import pytest from expecter import expect from gitman.models import Config, load_config from .conftest import FILES class TestConfig: def test_init_defaults(self): """Verify a config has a default filename and location.""" config = Config('mock/root') assert 'mock/root' == config.root assert 'gitman.yml' == config.filename assert 'gitman_sources' == config.location assert [] == config.sources def test_init_filename(self): """Verify the filename can be customized.""" config = Config('mock/root', 'mock.custom') assert 'mock.custom' == config.filename assert 'gitman_sources' == config.location def test_init_location(self): """Verify the location can be customized.""" config = Config('mock/root', location='.gitman') assert 'gitman.yml' == config.filename assert '.gitman' == config.location def test_path(self): """Verify the path is correct.""" config = Config('mock/root') assert os.path.normpath("mock/root/gitman.yml") == config.path @pytest.mark.integration def test_install_and_list(self): """Verify the correct dependencies are installed.""" config = Config(FILES) count = config.install_dependencies() assert 7 == count deps = list(config.get_dependencies()) assert 7 == len(deps) assert '1de84ca1d315f81b035cd7b0ecf87ca2025cdacd' == deps[0][2] assert '050290bca3f14e13fd616604202b579853e7bfb0' == deps[1][2] assert 'fb693447579235391a45ca170959b5583c5042d8' == deps[2][2] # master branch always changes --------------------- deps[3][2] # master branch always changes --------------------- deps[4][2] assert '7bd138fe7359561a8c2ff9d195dff238794ccc04' == deps[5][2] assert '2da24fca34af3748e3cab61db81a2ae8b35aec94' == deps[6][2] assert 5 == len(list(config.get_dependencies(depth=2))) assert 3 == len(list(config.get_dependencies(depth=1))) assert 0 == len(list(config.get_dependencies(depth=0))) @pytest.mark.integration def test_install_with_dirs(self): """Verify the dependency list can be filtered.""" config = Config(FILES) count = config.install_dependencies('gitman_2', 'gitman_3') assert 2 == count def test_install_with_dirs_unknown(self): """Verify zero dependencies are installed with unknown dependency.""" config = Config(FILES) count = config.install_dependencies('foobar') assert 0 == count def test_install_with_depth_0(self): """Verify an install depth of 0 installs nothing.""" config = Config(FILES) count = config.install_dependencies(depth=0) assert 0 == count @pytest.mark.integration def test_install_with_depth_1(self): """Verify an install depth of 1 installs the direct dependencies.""" config = Config(FILES) count = config.install_dependencies(depth=1) assert 3 == count @pytest.mark.integration def test_install_with_depth_2(self): """Verify an install depth of 2 installs 1 level of nesting.""" config = Config(FILES) count = config.install_dependencies(depth=2) assert 5 == count def describe_config(): @pytest.fixture def config(): return Config('m/root', 'm.ext', 'm/location') def describe_get_path(): def it_defaults_to_sources_location(config): expect(config.get_path()) == os.path.normpath("m/root/m/location") def it_can_get_the_config_path(config): expect(config.get_path('__config__')) == os.path.normpath("m/root/m.ext") def it_can_get_log_path(config): expect(config.get_path('__log__')) == os.path.normpath( "m/root/m/location/gitman.log" ) def it_can_get_dependency_path(config): expect(config.get_path('foobar')) == os.path.normpath( "m/root/m/location/foobar" ) class TestLoad: def test_load_from_directory_with_config_file(self): config = load_config(FILES) assert None is not config def test_load_from_directory_without_config_file(self, tmpdir): tmpdir.chdir() config = load_config() assert None is config PK! "gitman/tests/test_models_source.py# pylint: disable=no-self-use,redefined-outer-name,misplaced-comparison-constant from copy import copy from unittest.mock import Mock, patch import pytest from gitman.models import Source @pytest.fixture def source(): return Source('git', 'repo', 'name', rev='rev', link='link') class TestSource: def test_init_defaults(self): """Verify a source has a default revision.""" source = Source('git', 'http://example.com/foo/bar.git') assert 'http://example.com/foo/bar.git' == source.repo assert 'bar' == source.name assert 'master' == source.rev assert None is source.link def test_init_rev(self): """Verify the revision can be customized.""" source = Source('git', 'http://mock.git', 'mock_name', 'v1.0') assert 'v1.0' == source.rev def test_init_link(self): """Verify the link can be set.""" source = Source('git', 'http://mock.git', 'mock_name', link='mock/link') assert 'mock/link' == source.link def test_init_error(self): """Verify the repository, name, and rev are required.""" with pytest.raises(ValueError): Source('git', '', name='mock_name', rev='master') with pytest.raises(ValueError): Source('git', 'http://mock.git', name='', rev='master') with pytest.raises(ValueError): Source('git', 'http://mock.git', name='mock_name', rev='') def test_repr(self, source): """Verify sources can be represented.""" assert "" == repr(source) def test_repr_no_link(self, source): """Verify sources can be represented.""" source.link = None assert "" == repr(source) def test_eq(self, source): source2 = copy(source) assert source == source2 source2.name = "dir2" assert source != source2 def test_lt(self): sources = [ Source('git', 'http://github.com/owner/123.git'), Source('git', 'bbb', name='456'), Source('git', 'ccc', '456'), Source('git', 'BBB', 'AAA'), Source('git', 'AAA', 'AAA'), ] assert sources == sorted(sources) def test_identify_missing(self, source, tmpdir): """Verify a missing source identifies as unknown.""" tmpdir.chdir() with patch('os.path.isdir', Mock(return_value=False)): assert (str(tmpdir), '', '') == source.identify() def test_lock_uses_the_identity_rev(self, source): source.identify = Mock(return_value=('path2', 'dir2', 'abc123')) source2 = source.lock() assert 'abc123' == source2.rev assert 'name' == source2.name PK!<[ ^^gitman/tests/test_plugin.py# pylint: disable=no-self-use from unittest.mock import call, patch from gitman import plugin class TestMain: """Unit tests for the top-level arguments.""" @patch('gitman.cli.commands') def test_install(self, mock_commands): """Verify 'install' is the default command.""" mock_commands.install.__name__ = 'mock' plugin.main([]) assert [ call.install( root=None, depth=None, clean=False, fetch=True, force=False, skip_changes=False, ), call.install().__bool__(), # command status check ] == mock_commands.mock_calls @patch('gitman.cli.commands') def test_update(self, mock_commands): """Verify 'update' can be called with cleaning.""" mock_commands.update.__name__ = 'mock' plugin.main(['--update', '--clean']) assert [ call.update( root=None, depth=None, clean=True, force=False, recurse=False, lock=True, skip_changes=False, ), call.update().__bool__(), # command status check ] == mock_commands.mock_calls @patch('gitman.cli.commands') def test_update_recursive(self, mock_commands): """Verify 'update' can be called recursively.""" mock_commands.update.__name__ = 'mock' plugin.main(['--update', '--all']) assert [ call.update( root=None, depth=None, clean=False, force=False, recurse=True, lock=True, skip_changes=False, ), call.update().__bool__(), # command status check ] == mock_commands.mock_calls @patch('gitman.cli.commands') def test_update_no_lock(self, mock_commands): """Verify 'update' can be called without locking.""" mock_commands.update.__name__ = 'mock' plugin.main(['--update', '--skip-lock']) assert [ call.update( root=None, depth=None, clean=False, force=False, recurse=False, lock=False, skip_changes=False, ), call.update().__bool__(), # command status check ] == mock_commands.mock_calls @patch('gitman.cli.commands') def test_update_skip_changes(self, mock_commands): """Verify the 'update' command with skip changes option.""" mock_commands.update.__name__ = 'mock' plugin.main(['--update', '--skip-changes']) assert [ call.update( root=None, depth=None, clean=False, force=False, recurse=False, lock=True, skip_changes=True, ), call.update().__bool__(), # command status check ] == mock_commands.mock_calls @patch('gitman.cli.commands') def test_list(self, mock_commands): """Verify 'list' can be called.""" mock_commands.display.__name__ = 'mock' plugin.main(['--list']) assert [ call.display(root=None, depth=None, allow_dirty=True), call.display().__bool__(), # command status check ] == mock_commands.mock_calls @patch('gitman.cli.commands') def test_uninstall(self, mock_commands): """Verify 'clean' can be called with force.""" mock_commands.delete.__name__ = 'mock' plugin.main(['--uninstall', '--force']) assert [ call.delete(root=None, force=True, keep_location=False), call.delete().__bool__(), # command status check ] == mock_commands.mock_calls PK!AS1 1 gitman/tests/test_shell.py# pylint: disable=no-self-use,misplaced-comparison-constant,expression-not-assigned import os from unittest.mock import Mock, patch import pytest from expecter import expect from gitman import shell from gitman.exceptions import ShellError from .utils import check_calls class TestCall: """Tests for interacting with the shell.""" def test_other_error_uncaught(self): """Verify program errors raise exceptions.""" with pytest.raises(ShellError): shell.call('git', '--invalid-git-argument') def test_other_error_ignored(self): """Verify program errors can be ignored.""" shell.call('git', '--invalid-git-argument', _ignore=True) def test_other_capture(self): """Verify a program's output can be captured.""" if os.name == 'nt': lines = shell.call('echo Hello, world!', _shell=True) else: lines = shell.call('echo', 'Hello, world!') expect(lines) == ["Hello, world!"] @patch('gitman.shell.call') class TestPrograms: """Tests for calls to shell programs.""" def test_mkdir(self, mock_call): """Verify the commands to create directories.""" shell.mkdir('mock/dirpath') if os.name == 'nt': check_calls(mock_call, ["mkdir mock/dirpath"]) else: check_calls(mock_call, ["mkdir -p mock/dirpath"]) @patch('os.chdir') def test_cd(self, mock_chdir, mock_call): """Verify the commands to change directories.""" shell.cd('mock/dirpath') mock_chdir.assert_called_once_with('mock/dirpath') check_calls(mock_call, []) @patch('os.path.isdir', Mock(return_value=True)) def test_ln(self, mock_call): """Verify the commands to create symbolic links.""" shell.ln('mock/target', 'mock/source') if os.name == 'nt': check_calls(mock_call, []) else: check_calls(mock_call, ["ln -s mock/target mock/source"]) @patch('os.path.isdir', Mock(return_value=False)) @patch('os.path.exists', Mock(return_value=False)) def test_ln_missing_parent(self, mock_call): """Verify the commands to create symbolic links (missing parent).""" shell.ln('mock/target', 'mock/source') if os.name == 'nt': check_calls(mock_call, []) else: check_calls(mock_call, ["mkdir -p mock", "ln -s mock/target mock/source"]) @patch('os.path.isfile', Mock(return_value=True)) def test_rm_file(self, mock_call): """Verify the commands to delete files.""" shell.rm('mock/path') if os.name == 'nt': check_calls(mock_call, ["del /Q /F mock/path"]) else: check_calls(mock_call, ["rm -rf mock/path"]) @patch('os.path.isdir', Mock(return_value=True)) def test_rm_directory(self, mock_call): """Verify the commands to delete directories.""" shell.rm('mock/dirpath') if os.name == 'nt': check_calls(mock_call, ["rmdir /Q /S mock/dirpath"]) else: check_calls(mock_call, ["rm -rf mock/dirpath"]) PK!mgitman-1.6.dist-info/METADATAWr۶Sl'eiI:NӸMjJis"Wb`Pw$?I|o-:Q 'R'WQaS*QG7I4jJeǠD=mc1uK%"2ZYO'赮0n4sҴY62f6F¥7ڒw/\hSZ_4Add/:QZ9H/qJ7lFNB!pnȋ‘)+KkG 'iz%[" v(.[] Ylt xNʯ te멒v%wFƄĎ|܃ n~%NמQUЇq(;<}5xmz1rI>OA. ;H\s 8x ;Gχdo._E[6thbPn.l"6JrKm֎~{%ȆM|G[6?($(zΈQDV -:4XCa 0(g@%pȒ^r.J_ mɂ@1ZDDW%V;HBZ+Zi# DA]~u2 ܳ!ibgU)Ksioům;~PoA+5~7i&-䗹=țj gSs껲Ra}sPc/]" %NWF!5s8*ߏ yw|y~;Ї׌ߘAjMO{3Aе {q(zby s?yv{pi?_ !)TWw^u.O-dt0Ok<}FQn&sc:YfQy54ىu/"bQ@W-uk\3:is-iȞ x>:J;t5wd>U-Th!![{J-ߊFɻ-+ I`@cWL.E" dFe䃮 2?+ph8zRfpMre˙v2Q8OY+?ǏNu'7&- h%-f4@<YnHH,)qA>^Fou6Ӧ9+蔆7-\}LZ#jUr0t1Wp!}yqxٴ [˕k4̹] Lp+=6Z\d1$%60a'Vcp?9* ;@ph{n`\"n#`n/Q-vA.dk皬PK!H! gitman-1.6.dist-info/RECORDuǒ< TY7F*T X|qjnWV]5ǰS!8ot#Ø'**]}t$tAbs OڨHĬPJI6+3ޥqEe8&6/Oyz4 T0uUe{?|h` x~VI {n–'-ި%%2ےi8&.;ړ3P`qnBJ7ES}MK+7B' \3j$378rE%!-ɆU%h4yvy>\6>a"=瞀$7d6 YV h6W2?LI/Ӭ~aiR)'d!fBp:XRK.7n5ZM},wO= ?Z!:|~R?x6uѴԸsݐNl85p Q RTエ9 1w XwV )+;7'ʼn0?>zutu!gS53bJt)0`@W5f$ѩp&BPȧOs־AC+Ί'M؈h6l/rp=)yG3gH(iו%`?zؓfъJs'ǫoT~t2IoH!4#'|ɦ_Eڂ2}KnĪ9I8{`K6N{r1O%<_=%Y-.G̅5xS,gn2U٤kb QQ)fW6s;$j[y15fq6H_#e*.kXrĝz"~~XQ3> Q' E5,ۥetaǺVX}0LEǓ)LIAy.4?\wؽ,u|(IYZ(~0]4qRpL7 t^Jbv 8P0 ȰVfRS 8QS8~fԿGU\;`8#͖tqY.ֺ`(L/uD=?5`Í!\ƳZq*", ҡܑL9#^”s.ݞ_G{j" 8T%x?2}a|8ugzk5 @@Wr}D70׭N5ϤXr^gBJ#uC_u1o ׻!|*tbfD vUvl3lEdyK5]zX~Yϖ&?"';G/PsK+Bϰ'W^=>( +jz|6IqmAMp٩l5@?*omџ_-E+LnMb P[aP/<?PK!1gitman/__init__.pyPK!TNNDgitman/__main__.pyPK!B$$ gitman/cli.pyPK!·$$(gitman/commands.pyPK!Mgitman/common.pyPK!s(agitman/exceptions.pyPK!2Y dgitman/git.pyPK!-QQgitman/models/__init__.pyPK!$''Egitman/models/config.pyPK!S7cYgitman/models/source.pyPK!C" " )gitman/plugin.pyPK! ygitman/settings.pyPK!/*Մ gitman/shell.pyPK!?l1gitman/system.pyPK!ޙPb""gitman/tests/__init__.pyPK!κ]gitman/tests/conftest.pyPK!=Ȳa  !egitman/tests/files/gdm-custom.ymlPK!.?3H"gitman/tests/files/gdm-default.ymlPK!wA55gitman/tests/files/gitman.ymlPK!hVw/w/gitman/tests/test_cli.pyPK!^^gitman/tests/test_commands.pyPK!Wo X#gitman/tests/test_common.pyPK!x/gitman/tests/test_git.pyPK!= !"Ngitman/tests/test_models_config.pyPK! "`gitman/tests/test_models_source.pyPK!<[ ^^kgitman/tests/test_plugin.pyPK!AS1 1 g{gitman/tests/test_shell.pyPK!m~gitman-1.6.dist-info/METADATAPK!H! gitman-1.6.dist-info/RECORDPK""O ٞ