PKUN rZZcherry_picker/__init__.py"""Backport CPython changes from master to maintenance branches.""" __version__ = '1.3.0' PKUNh7$]]cherry_picker/__main__.pyfrom .cherry_picker import cherry_pick_cli if __name__ == '__main__': cherry_pick_cli() PKUNggcherry_picker/cherry_picker.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import click import collections import enum import os import subprocess import webbrowser import re import sys import requests import toml from gidgethub import sansio from . import __version__ CREATE_PR_URL_TEMPLATE = ("https://api.github.com/repos/" "{config[team]}/{config[repo]}/pulls") DEFAULT_CONFIG = collections.ChainMap({ 'team': 'python', 'repo': 'cpython', 'check_sha': '7f777ed95a19224294949e1b4ce56bbffcb1fe9f', 'fix_commit_msg': True, 'default_branch': 'master', }) WORKFLOW_STATES = enum.Enum( 'Workflow states', """ FETCHING_UPSTREAM FETCHED_UPSTREAM CHECKING_OUT_DEFAULT_BRANCH CHECKED_OUT_DEFAULT_BRANCH PUSHING_TO_REMOTE PUSHED_TO_REMOTE PUSHING_TO_REMOTE_FAILED PR_CREATING PR_OPENING REMOVING_BACKPORT_BRANCH REMOVING_BACKPORT_BRANCH_FAILED REMOVED_BACKPORT_BRANCH BACKPORT_STARTING BACKPORT_LOOPING BACKPORT_LOOP_START BACKPORT_LOOP_END BACKPORT_COMPLETE ABORTING ABORTED ABORTING_FAILED CONTINUATION_STARTED BACKPORTING_CONTINUATION_SUCCEED CONTINUATION_FAILED BACKPORT_PAUSED UNSET """, ) class BranchCheckoutException(Exception): pass class CherryPickException(Exception): pass class InvalidRepoException(Exception): pass class CherryPicker: ALLOWED_STATES = WORKFLOW_STATES.BACKPORT_PAUSED, WORKFLOW_STATES.UNSET """The list of states expected at the start of the app.""" def __init__(self, pr_remote, commit_sha1, branches, *, dry_run=False, push=True, prefix_commit=True, config=DEFAULT_CONFIG, chosen_config_path=None, ): self.chosen_config_path = chosen_config_path """The config reference used in the current runtime. It starts with a Git revision specifier, followed by a colon and a path relative to the repo root. """ self.config = config self.check_repo() # may raise InvalidRepoException self.initial_state = self.get_state_and_verify() """The runtime state loaded from the config. Used to verify that we resume the process from the valid previous state. """ if dry_run: click.echo("Dry run requested, listing expected command sequence") self.pr_remote = pr_remote self.commit_sha1 = commit_sha1 self.branches = branches self.dry_run = dry_run self.push = push self.prefix_commit = prefix_commit def set_paused_state(self): """Save paused progress state into Git config.""" if self.chosen_config_path is not None: save_cfg_vals_to_git_cfg(config_path=self.chosen_config_path) set_state(WORKFLOW_STATES.BACKPORT_PAUSED) @property def upstream(self): """Get the remote name to use for upstream branches Uses "upstream" if it exists, "origin" otherwise """ cmd = ['git', 'remote', 'get-url', 'upstream'] try: subprocess.check_output(cmd, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError: return "origin" return "upstream" @property def sorted_branches(self): """Return the branches to cherry-pick to, sorted by version.""" return sorted( self.branches, reverse=True, key=version_from_branch) @property def username(self): cmd = ['git', 'config', '--get', f'remote.{self.pr_remote}.url'] raw_result = subprocess.check_output(cmd, stderr=subprocess.STDOUT) result = raw_result.decode('utf-8') # implicit ssh URIs use : to separate host from user, others just use / username = result.replace(':', '/').split('/')[-2] return username def get_cherry_pick_branch(self, maint_branch): return f"backport-{self.commit_sha1[:7]}-{maint_branch}" def get_pr_url(self, base_branch, head_branch): return f"https://github.com/{self.config['team']}/{self.config['repo']}/compare/{base_branch}...{self.username}:{head_branch}?expand=1" def fetch_upstream(self): """ git fetch """ set_state(WORKFLOW_STATES.FETCHING_UPSTREAM) cmd = ['git', 'fetch', self.upstream] self.run_cmd(cmd) set_state(WORKFLOW_STATES.FETCHED_UPSTREAM) def run_cmd(self, cmd): assert not isinstance(cmd, str) if self.dry_run: click.echo(f" dry-run: {' '.join(cmd)}") return output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) click.echo(output.decode('utf-8')) def checkout_branch(self, branch_name): """ git checkout -b """ cmd = ['git', 'checkout', '-b', self.get_cherry_pick_branch(branch_name), f'{self.upstream}/{branch_name}'] try: self.run_cmd(cmd) except subprocess.CalledProcessError as err: click.echo(f"Error checking out the branch {self.get_cherry_pick_branch(branch_name)}.") click.echo(err.output) raise BranchCheckoutException(f"Error checking out the branch {self.get_cherry_pick_branch(branch_name)}.") def get_commit_message(self, commit_sha): """ Return the commit message for the current commit hash, replace # with GH- """ cmd = ['git', 'show', '-s', '--format=%B', commit_sha] output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) message = output.strip().decode('utf-8') if self.config['fix_commit_msg']: return message.replace('#', 'GH-') else: return message def checkout_default_branch(self): """ git checkout default branch """ set_state(WORKFLOW_STATES.CHECKING_OUT_DEFAULT_BRANCH) cmd = 'git', 'checkout', self.config['default_branch'] self.run_cmd(cmd) set_state(WORKFLOW_STATES.CHECKED_OUT_DEFAULT_BRANCH) def status(self): """ git status :return: """ cmd = ['git', 'status'] self.run_cmd(cmd) def cherry_pick(self): """ git cherry-pick -x """ cmd = ['git', 'cherry-pick', '-x', self.commit_sha1] try: self.run_cmd(cmd) except subprocess.CalledProcessError as err: click.echo(f"Error cherry-pick {self.commit_sha1}.") click.echo(err.output) raise CherryPickException(f"Error cherry-pick {self.commit_sha1}.") def get_exit_message(self, branch): return \ f""" Failed to cherry-pick {self.commit_sha1} into {branch} \u2639 ... Stopping here. To continue and resolve the conflict: $ cherry_picker --status # to find out which files need attention # Fix the conflict $ cherry_picker --status # should now say 'all conflict fixed' $ cherry_picker --continue To abort the cherry-pick and cleanup: $ cherry_picker --abort """ def amend_commit_message(self, cherry_pick_branch): """ prefix the commit message with (X.Y) """ commit_prefix = "" if self.prefix_commit: commit_prefix = f"[{get_base_branch(cherry_pick_branch)}] " updated_commit_message = f"""{commit_prefix}{self.get_commit_message(self.commit_sha1)} (cherry picked from commit {self.commit_sha1}) Co-authored-by: {get_author_info_from_short_sha(self.commit_sha1)}""" if self.dry_run: click.echo(f" dry-run: git commit --amend -m '{updated_commit_message}'") else: cmd = ['git', 'commit', '--amend', '-m', updated_commit_message] try: subprocess.check_output(cmd, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as cpe: click.echo("Failed to amend the commit message \u2639") click.echo(cpe.output) return updated_commit_message def push_to_remote(self, base_branch, head_branch, commit_message=""): """ git push """ set_state(WORKFLOW_STATES.PUSHING_TO_REMOTE) cmd = ['git', 'push', self.pr_remote, f'{head_branch}:{head_branch}'] try: self.run_cmd(cmd) set_state(WORKFLOW_STATES.PUSHED_TO_REMOTE) except subprocess.CalledProcessError: click.echo(f"Failed to push to {self.pr_remote} \u2639") set_state(WORKFLOW_STATES.PUSHING_TO_REMOTE_FAILED) else: gh_auth = os.getenv("GH_AUTH") if gh_auth: set_state(WORKFLOW_STATES.PR_CREATING) self.create_gh_pr(base_branch, head_branch, commit_message=commit_message, gh_auth=gh_auth) else: set_state(WORKFLOW_STATES.PR_OPENING) self.open_pr(self.get_pr_url(base_branch, head_branch)) def create_gh_pr(self, base_branch, head_branch, *, commit_message, gh_auth): """ Create PR in GitHub """ request_headers = sansio.create_headers( self.username, oauth_token=gh_auth) title, body = normalize_commit_message(commit_message) if not self.prefix_commit: title = f"[{base_branch}] {title}" data = { "title": title, "body": body, "head": f"{self.username}:{head_branch}", "base": base_branch, "maintainer_can_modify": True } url = CREATE_PR_URL_TEMPLATE.format(config=self.config) response = requests.post(url, headers=request_headers, json=data) if response.status_code == requests.codes.created: click.echo(f"Backport PR created at {response.json()['html_url']}") else: click.echo(response.status_code) click.echo(response.text) def open_pr(self, url): """ open url in the web browser """ if self.dry_run: click.echo(f" dry-run: Create new PR: {url}") else: click.echo("Backport PR URL:") click.echo(url) webbrowser.open_new_tab(url) def delete_branch(self, branch): cmd = ['git', 'branch', '-D', branch] self.run_cmd(cmd) def cleanup_branch(self, branch): """Remove the temporary backport branch. Switch to the default branch before that. """ set_state(WORKFLOW_STATES.REMOVING_BACKPORT_BRANCH) self.checkout_default_branch() try: self.delete_branch(branch) except subprocess.CalledProcessError: click.echo(f"branch {branch} NOT deleted.") set_state(WORKFLOW_STATES.REMOVING_BACKPORT_BRANCH_FAILED) else: click.echo(f"branch {branch} has been deleted.") set_state(WORKFLOW_STATES.REMOVED_BACKPORT_BRANCH) def backport(self): if not self.branches: raise click.UsageError("At least one branch must be specified.") set_state(WORKFLOW_STATES.BACKPORT_STARTING) self.fetch_upstream() set_state(WORKFLOW_STATES.BACKPORT_LOOPING) for maint_branch in self.sorted_branches: set_state(WORKFLOW_STATES.BACKPORT_LOOP_START) click.echo(f"Now backporting '{self.commit_sha1}' into '{maint_branch}'") cherry_pick_branch = self.get_cherry_pick_branch(maint_branch) self.checkout_branch(maint_branch) commit_message = "" try: self.cherry_pick() commit_message = self.amend_commit_message(cherry_pick_branch) except subprocess.CalledProcessError as cpe: click.echo(cpe.output) click.echo(self.get_exit_message(maint_branch)) except CherryPickException: click.echo(self.get_exit_message(maint_branch)) self.set_paused_state() raise else: if self.push: self.push_to_remote(maint_branch, cherry_pick_branch, commit_message) self.cleanup_branch(cherry_pick_branch) else: click.echo(\ f""" Finished cherry-pick {self.commit_sha1} into {cherry_pick_branch} \U0001F600 --no-push option used. ... Stopping here. To continue and push the changes: $ cherry_picker --continue To abort the cherry-pick and cleanup: $ cherry_picker --abort """) self.set_paused_state() return # to preserve the correct state set_state(WORKFLOW_STATES.BACKPORT_LOOP_END) set_state(WORKFLOW_STATES.BACKPORT_COMPLETE) def abort_cherry_pick(self): """ run `git cherry-pick --abort` and then clean up the branch """ if self.initial_state != WORKFLOW_STATES.BACKPORT_PAUSED: raise ValueError('One can only abort a paused process.') cmd = ['git', 'cherry-pick', '--abort'] try: set_state(WORKFLOW_STATES.ABORTING) self.run_cmd(cmd) set_state(WORKFLOW_STATES.ABORTED) except subprocess.CalledProcessError as cpe: click.echo(cpe.output) set_state(WORKFLOW_STATES.ABORTING_FAILED) # only delete backport branch created by cherry_picker.py if get_current_branch().startswith('backport-'): self.cleanup_branch(get_current_branch()) reset_stored_config_ref() reset_state() def continue_cherry_pick(self): """ git push origin open the PR clean up branch """ if self.initial_state != WORKFLOW_STATES.BACKPORT_PAUSED: raise ValueError('One can only continue a paused process.') cherry_pick_branch = get_current_branch() if cherry_pick_branch.startswith('backport-'): set_state(WORKFLOW_STATES.CONTINUATION_STARTED) # amend the commit message, prefix with [X.Y] base = get_base_branch(cherry_pick_branch) short_sha = cherry_pick_branch[cherry_pick_branch.index('-')+1:cherry_pick_branch.index(base)-1] full_sha = get_full_sha_from_short(short_sha) commit_message = self.get_commit_message(short_sha) co_author_info = f"Co-authored-by: {get_author_info_from_short_sha(short_sha)}" updated_commit_message = f"""[{base}] {commit_message}. (cherry picked from commit {full_sha}) {co_author_info}""" if self.dry_run: click.echo(f" dry-run: git commit -a -m '{updated_commit_message}' --allow-empty") else: cmd = ['git', 'commit', '-a', '-m', updated_commit_message, '--allow-empty'] subprocess.check_output(cmd, stderr=subprocess.STDOUT) self.push_to_remote(base, cherry_pick_branch) self.cleanup_branch(cherry_pick_branch) click.echo("\nBackport PR:\n") click.echo(updated_commit_message) set_state(WORKFLOW_STATES.BACKPORTING_CONTINUATION_SUCCEED) else: click.echo(f"Current branch ({cherry_pick_branch}) is not a backport branch. Will not continue. \U0001F61B") set_state(WORKFLOW_STATES.CONTINUATION_FAILED) reset_stored_config_ref() reset_state() def check_repo(self): """ Check that the repository is for the project we're configured to operate on. This function performs the check by making sure that the sha specified in the config is present in the repository that we're operating on. """ try: validate_sha(self.config['check_sha']) except ValueError: raise InvalidRepoException() def get_state_and_verify(self): """Return the run progress state stored in the Git config. Raises ValueError if the retrieved state is not of a form that cherry_picker would have stored in the config. """ try: state = get_state() except KeyError as ke: class state: name = str(ke.args[0]) if state not in self.ALLOWED_STATES: raise ValueError( f'Run state cherry-picker.state={state.name} in Git config ' 'is not known.\nPerhaps it has been set by a newer ' 'version of cherry-picker. Try upgrading.\n' 'Valid states are: ' f'{", ".join(s.name for s in self.ALLOWED_STATES)}. ' 'If this looks suspicious, raise an issue at ' 'https://github.com/python/core-workflow/issues/new.\n' 'As the last resort you can reset the runtime state ' 'stored in Git config using the following command: ' '`git config --local --remove-section cherry-picker`' ) return state CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @click.command(context_settings=CONTEXT_SETTINGS) @click.version_option(version=__version__) @click.option('--dry-run', is_flag=True, help="Prints out the commands, but not executed.") @click.option('--pr-remote', 'pr_remote', metavar='REMOTE', help='git remote to use for PR branches', default='origin') @click.option('--abort', 'abort', flag_value=True, default=None, help="Abort current cherry-pick and clean up branch") @click.option('--continue', 'abort', flag_value=False, default=None, help="Continue cherry-pick, push, and clean up branch") @click.option('--status', 'status', flag_value=True, default=None, help="Get the status of cherry-pick") @click.option('--push/--no-push', 'push', is_flag=True, default=True, help="Changes won't be pushed to remote") @click.option('--config-path', 'config_path', metavar='CONFIG-PATH', help=("Path to config file, .cherry_picker.toml " "from project root by default. You can prepend " "a colon-separated Git 'commitish' reference."), default=None) @click.argument('commit_sha1', nargs=1, default="") @click.argument('branches', nargs=-1) @click.pass_context def cherry_pick_cli(ctx, dry_run, pr_remote, abort, status, push, config_path, commit_sha1, branches): """cherry-pick COMMIT_SHA1 into target BRANCHES.""" click.echo("\U0001F40D \U0001F352 \u26CF") chosen_config_path, config = load_config(config_path) try: cherry_picker = CherryPicker(pr_remote, commit_sha1, branches, dry_run=dry_run, push=push, config=config, chosen_config_path=chosen_config_path) except InvalidRepoException: click.echo(f"You're not inside a {config['repo']} repo right now! \U0001F645") sys.exit(-1) except ValueError as exc: ctx.fail(exc) if abort is not None: if abort: cherry_picker.abort_cherry_pick() else: cherry_picker.continue_cherry_pick() elif status: click.echo(cherry_picker.status()) else: try: cherry_picker.backport() except BranchCheckoutException: sys.exit(-1) except CherryPickException: sys.exit(-1) def get_base_branch(cherry_pick_branch): """ return '2.7' from 'backport-sha-2.7' raises ValueError if the specified branch name is not of a form that cherry_picker would have created """ prefix, sha, base_branch = cherry_pick_branch.split('-', 2) if prefix != 'backport': raise ValueError('branch name is not prefixed with "backport-". Is this a cherry_picker branch?') if not re.match('[0-9a-f]{7,40}', sha): raise ValueError(f'branch name has an invalid sha: {sha}') # Validate that the sha refers to a valid commit within the repo # Throws a ValueError if the sha is not present in the repo validate_sha(sha) # Subject the parsed base_branch to the same tests as when we generated it # This throws a ValueError if the base_branch doesn't meet our requirements version_from_branch(base_branch) return base_branch def validate_sha(sha): """ Validate that a hexdigest sha is a valid commit in the repo raises ValueError if the sha does not reference a commit within the repo """ cmd = ['git', 'log', '-r', sha] try: subprocess.check_output(cmd, stderr=subprocess.STDOUT) except subprocess.SubprocessError: raise ValueError(f'The sha listed in the branch name, {sha}, is not present in the repository') def version_from_branch(branch): """ return version information from a git branch name """ try: return tuple(map(int, re.match(r'^.*(?P\d+(\.\d+)+).*$', branch).groupdict()['version'].split('.'))) except AttributeError as attr_err: raise ValueError(f'Branch {branch} seems to not have a version in its name.') from attr_err def get_current_branch(): """ Return the current branch """ cmd = ['git', 'rev-parse', '--abbrev-ref', 'HEAD'] output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) return output.strip().decode('utf-8') def get_full_sha_from_short(short_sha): cmd = ['git', 'log', '-1', '--format=%H', short_sha] output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) full_sha = output.strip().decode('utf-8') return full_sha def get_author_info_from_short_sha(short_sha): cmd = ['git', 'log', '-1', '--format=%aN <%ae>', short_sha] output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) author = output.strip().decode('utf-8') return author def normalize_commit_message(commit_message): """ Return a tuple of title and body from the commit message """ split_commit_message = commit_message.split("\n") title = split_commit_message[0] body = "\n".join(split_commit_message[1:]) return title, body.lstrip("\n") def is_git_repo(): """Check whether the current folder is a Git repo.""" cmd = 'git', 'rev-parse', '--git-dir' try: subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) return True except subprocess.CalledProcessError: return False def find_config(revision): """Locate and return the default config for current revison.""" if not is_git_repo(): return None cfg_path = f'{revision}:.cherry_picker.toml' cmd = 'git', 'cat-file', '-t', cfg_path try: output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) path_type = output.strip().decode('utf-8') return cfg_path if path_type == 'blob' else None except subprocess.CalledProcessError: return None def load_config(path=None): """Choose and return the config path and it's contents as dict.""" # NOTE: Initially I wanted to inherit Path to encapsulate Git access # there but there's no easy way to subclass pathlib.Path :( head_sha = get_sha1_from('HEAD') revision = head_sha saved_config_path = load_val_from_git_cfg('config_path') if not path and saved_config_path is not None: path = saved_config_path if path is None: path = find_config(revision=revision) else: if ':' not in path: path = f'{head_sha}:{path}' revision, _col, _path = path.partition(':') if not revision: revision = head_sha config = DEFAULT_CONFIG if path is not None: config_text = from_git_rev_read(path) d = toml.loads(config_text) config = config.new_child(d) return path, config def get_sha1_from(commitish): """Turn 'commitish' into its sha1 hash.""" cmd = ['git', 'rev-parse', commitish] return subprocess.check_output(cmd).strip().decode('utf-8') def reset_stored_config_ref(): """Remove the config path option from Git config.""" try: wipe_cfg_vals_from_git_cfg('config_path') except subprocess.CalledProcessError: """Config file pointer is not stored in Git config.""" def reset_state(): """Remove the progress state from Git config.""" wipe_cfg_vals_from_git_cfg('state') def set_state(state): """Save progress state into Git config.""" save_cfg_vals_to_git_cfg(state=state.name) def get_state(): """Retrieve the progress state from Git config.""" return get_state_from_string(load_val_from_git_cfg('state') or 'UNSET') def save_cfg_vals_to_git_cfg(**cfg_map): """Save a set of options into Git config.""" for cfg_key_suffix, cfg_val in cfg_map.items(): cfg_key = f'cherry-picker.{cfg_key_suffix.replace("_", "-")}' cmd = 'git', 'config', '--local', cfg_key, cfg_val subprocess.check_call(cmd, stderr=subprocess.STDOUT) def wipe_cfg_vals_from_git_cfg(*cfg_opts): """Remove a set of options from Git config.""" for cfg_key_suffix in cfg_opts: cfg_key = f'cherry-picker.{cfg_key_suffix.replace("_", "-")}' cmd = 'git', 'config', '--local', '--unset-all', cfg_key subprocess.check_call(cmd, stderr=subprocess.STDOUT) def load_val_from_git_cfg(cfg_key_suffix): """Retrieve one option from Git config.""" cfg_key = f'cherry-picker.{cfg_key_suffix.replace("_", "-")}' cmd = 'git', 'config', '--local', '--get', cfg_key try: return subprocess.check_output( cmd, stderr=subprocess.DEVNULL, ).strip().decode('utf-8') except subprocess.CalledProcessError: return None def from_git_rev_read(path): """Retrieve given file path contents of certain Git revision.""" if ':' not in path: raise ValueError('Path identifier must start with a revision hash.') cmd = 'git', 'show', '-t', path try: return subprocess.check_output(cmd).rstrip().decode('utf-8') except subprocess.CalledProcessError: raise ValueError def get_state_from_string(state_str): return WORKFLOW_STATES.__members__[state_str] if __name__ == '__main__': cherry_pick_cli() PKUN ||cherry_picker/test.pyimport os import pathlib import subprocess from collections import ChainMap from unittest import mock import pytest import click from .cherry_picker import get_base_branch, get_current_branch, \ get_full_sha_from_short, get_author_info_from_short_sha, \ CherryPicker, InvalidRepoException, CherryPickException, \ normalize_commit_message, DEFAULT_CONFIG, \ get_sha1_from, find_config, load_config, validate_sha, \ from_git_rev_read, \ reset_state, set_state, get_state, \ load_val_from_git_cfg, reset_stored_config_ref, \ WORKFLOW_STATES @pytest.fixture def config(): check_sha = 'dc896437c8efe5a4a5dfa50218b7a6dc0cbe2598' return ChainMap(DEFAULT_CONFIG).new_child({'check_sha': check_sha}) @pytest.fixture def cd(): cwd = os.getcwd() def changedir(d): os.chdir(d) yield changedir # restore CWD back os.chdir(cwd) @pytest.fixture def git_init(): git_init_cmd = 'git', 'init', '.' return lambda: subprocess.run(git_init_cmd, check=True) @pytest.fixture def git_add(): git_add_cmd = 'git', 'add' return lambda *extra_args: ( subprocess.run(git_add_cmd + extra_args, check=True) ) @pytest.fixture def git_checkout(): git_checkout_cmd = 'git', 'checkout' return lambda *extra_args: ( subprocess.run(git_checkout_cmd + extra_args, check=True) ) @pytest.fixture def git_branch(): git_branch_cmd = 'git', 'branch' return lambda *extra_args: ( subprocess.run(git_branch_cmd + extra_args, check=True) ) @pytest.fixture def git_commit(): git_commit_cmd = 'git', 'commit', '-m' return lambda msg, *extra_args: ( subprocess.run(git_commit_cmd + (msg, ) + extra_args, check=True) ) @pytest.fixture def git_cherry_pick(): git_cherry_pick_cmd = 'git', 'cherry-pick' return lambda *extra_args: ( subprocess.run(git_cherry_pick_cmd + extra_args, check=True) ) @pytest.fixture def tmp_git_repo_dir(tmpdir, cd, git_init, git_commit): cd(tmpdir) git_init() git_commit('Initial commit', '--allow-empty') yield tmpdir @mock.patch('subprocess.check_output') def test_get_base_branch(subprocess_check_output): # The format of cherry-pick branches we create are:: # backport-{SHA}-{base_branch} subprocess_check_output.return_value = b'22a594a0047d7706537ff2ac676cdc0f1dcb329c' cherry_pick_branch = 'backport-22a594a-2.7' result = get_base_branch(cherry_pick_branch) assert result == '2.7' @mock.patch('subprocess.check_output') def test_get_base_branch_which_has_dashes(subprocess_check_output): subprocess_check_output.return_value = b'22a594a0047d7706537ff2ac676cdc0f1dcb329c' cherry_pick_branch = 'backport-22a594a-baseprefix-2.7-basesuffix' result = get_base_branch(cherry_pick_branch) assert result == 'baseprefix-2.7-basesuffix' @pytest.mark.parametrize('cherry_pick_branch', ['backport-22a594a', # Not enough fields 'prefix-22a594a-2.7', # Not the prefix we were expecting 'backport-22a594a-base', # No version info in the base branch ] ) @mock.patch('subprocess.check_output') def test_get_base_branch_invalid(subprocess_check_output, cherry_pick_branch): subprocess_check_output.return_value = b'22a594a0047d7706537ff2ac676cdc0f1dcb329c' with pytest.raises(ValueError): get_base_branch(cherry_pick_branch) @mock.patch('subprocess.check_output') def test_get_current_branch(subprocess_check_output): subprocess_check_output.return_value = b'master' assert get_current_branch() == 'master' @mock.patch('subprocess.check_output') def test_get_full_sha_from_short(subprocess_check_output): mock_output = b"""22a594a0047d7706537ff2ac676cdc0f1dcb329c""" subprocess_check_output.return_value = mock_output assert get_full_sha_from_short('22a594a') == '22a594a0047d7706537ff2ac676cdc0f1dcb329c' @mock.patch('subprocess.check_output') def test_get_author_info_from_short_sha(subprocess_check_output): mock_output = b"Armin Rigo " subprocess_check_output.return_value = mock_output assert get_author_info_from_short_sha('22a594a') == 'Armin Rigo ' @pytest.mark.parametrize('input_branches,sorted_branches', [ (['3.1', '2.7', '3.10', '3.6'], ['3.10', '3.6', '3.1', '2.7']), (['stable-3.1', 'lts-2.7', '3.10-other', 'smth3.6else'], ['3.10-other', 'smth3.6else', 'stable-3.1', 'lts-2.7']), ]) @mock.patch('os.path.exists') def test_sorted_branch(os_path_exists, config, input_branches, sorted_branches): os_path_exists.return_value = True cp = CherryPicker('origin', '22a594a0047d7706537ff2ac676cdc0f1dcb329c', input_branches, config=config) assert cp.sorted_branches == sorted_branches @pytest.mark.parametrize('input_branches', [ (['3.1', '2.7', '3.x10', '3.6', '']), (['stable-3.1', 'lts-2.7', '3.10-other', 'smth3.6else', 'invalid']), ]) @mock.patch('os.path.exists') def test_invalid_branches(os_path_exists, config, input_branches): os_path_exists.return_value = True cp = CherryPicker('origin', '22a594a0047d7706537ff2ac676cdc0f1dcb329c', input_branches, config=config) with pytest.raises(ValueError): cp.sorted_branches @mock.patch('os.path.exists') def test_get_cherry_pick_branch(os_path_exists, config): os_path_exists.return_value = True branches = ["3.6"] cp = CherryPicker('origin', '22a594a0047d7706537ff2ac676cdc0f1dcb329c', branches, config=config) assert cp.get_cherry_pick_branch("3.6") == "backport-22a594a-3.6" def test_get_pr_url(config): branches = ["3.6"] cp = CherryPicker('origin', '22a594a0047d7706537ff2ac676cdc0f1dcb329c', branches, config=config) backport_target_branch = cp.get_cherry_pick_branch("3.6") expected_pr_url = ( 'https://github.com/python/cpython/compare/' '3.6...mock_user:backport-22a594a-3.6?expand=1' ) with mock.patch( 'subprocess.check_output', return_value=b'https://github.com/mock_user/cpython.git', ): actual_pr_url = cp.get_pr_url("3.6", backport_target_branch) assert actual_pr_url == expected_pr_url @pytest.mark.parametrize('url', [ b'git@github.com:mock_user/cpython.git', b'git@github.com:mock_user/cpython', b'ssh://git@github.com/mock_user/cpython.git', b'ssh://git@github.com/mock_user/cpython', b'https://github.com/mock_user/cpython.git', b'https://github.com/mock_user/cpython', ]) def test_username(url, config): branches = ["3.6"] cp = CherryPicker('origin', '22a594a0047d7706537ff2ac676cdc0f1dcb329c', branches, config=config) with mock.patch('subprocess.check_output', return_value=url): assert cp.username == 'mock_user' def test_get_updated_commit_message(config): branches = ["3.6"] cp = CherryPicker('origin', '22a594a0047d7706537ff2ac676cdc0f1dcb329c', branches, config=config) with mock.patch( 'subprocess.check_output', return_value=b'bpo-123: Fix Spam Module (#113)', ): actual_commit_message = ( cp.get_commit_message('22a594a0047d7706537ff2ac676cdc0f1dcb329c') ) assert actual_commit_message == 'bpo-123: Fix Spam Module (GH-113)' def test_get_updated_commit_message_without_links_replacement(config): config['fix_commit_msg'] = False branches = ["3.6"] cp = CherryPicker('origin', '22a594a0047d7706537ff2ac676cdc0f1dcb329c', branches, config=config) with mock.patch( 'subprocess.check_output', return_value=b'bpo-123: Fix Spam Module (#113)', ): actual_commit_message = ( cp.get_commit_message('22a594a0047d7706537ff2ac676cdc0f1dcb329c') ) assert actual_commit_message == 'bpo-123: Fix Spam Module (#113)' @mock.patch('subprocess.check_output') def test_is_cpython_repo(subprocess_check_output): subprocess_check_output.return_value = """commit 7f777ed95a19224294949e1b4ce56bbffcb1fe9f Author: Guido van Rossum Date: Thu Aug 9 14:25:15 1990 +0000 Initial revision """ # should not raise an exception validate_sha('22a594a0047d7706537ff2ac676cdc0f1dcb329c') def test_is_not_cpython_repo(): # use default CPython sha to fail on this repo with pytest.raises(InvalidRepoException): CherryPicker('origin', '22a594a0047d7706537ff2ac676cdc0f1dcb329c', ["3.6"]) def test_find_config(tmpdir, cd): cd(tmpdir) subprocess.run('git init .'.split(), check=True) relative_config_path = '.cherry_picker.toml' cfg = tmpdir.join(relative_config_path) cfg.write('param = 1') subprocess.run('git add .'.split(), check=True) subprocess.run(('git', 'commit', '-m', 'Initial commit'), check=True) scm_revision = get_sha1_from('HEAD') assert find_config(scm_revision) == scm_revision + ':' + relative_config_path def test_find_config_not_found(tmpdir, cd): cd(tmpdir) subprocess.run('git init .'.split(), check=True) subprocess.run(('git', 'commit', '-m', 'Initial commit', '--allow-empty'), check=True) scm_revision = get_sha1_from('HEAD') assert find_config(scm_revision) is None def test_find_config_not_git(tmpdir, cd): cd(tmpdir) assert find_config(None) is None def test_load_full_config(tmpdir, cd): cd(tmpdir) subprocess.run('git init .'.split(), check=True) relative_config_path = '.cherry_picker.toml' cfg = tmpdir.join(relative_config_path) cfg.write('''\ team = "python" repo = "core-workfolow" check_sha = "5f007046b5d4766f971272a0cc99f8461215c1ec" default_branch = "devel" ''') subprocess.run('git add .'.split(), check=True) subprocess.run(('git', 'commit', '-m', 'Initial commit'), check=True) scm_revision = get_sha1_from('HEAD') cfg = load_config(None) assert cfg == ( scm_revision + ':' + relative_config_path, { 'check_sha': '5f007046b5d4766f971272a0cc99f8461215c1ec', 'repo': 'core-workfolow', 'team': 'python', 'fix_commit_msg': True, 'default_branch': 'devel', }, ) def test_load_partial_config(tmpdir, cd): cd(tmpdir) subprocess.run('git init .'.split(), check=True) relative_config_path = '.cherry_picker.toml' cfg = tmpdir.join(relative_config_path) cfg.write('''\ repo = "core-workfolow" ''') subprocess.run('git add .'.split(), check=True) subprocess.run(('git', 'commit', '-m', 'Initial commit'), check=True) scm_revision = get_sha1_from('HEAD') cfg = load_config(relative_config_path) assert cfg == ( f'{scm_revision}:{relative_config_path}', { 'check_sha': '7f777ed95a19224294949e1b4ce56bbffcb1fe9f', 'repo': 'core-workfolow', 'team': 'python', 'fix_commit_msg': True, 'default_branch': 'master', }, ) def test_load_config_no_head_sha(tmp_git_repo_dir, git_add, git_commit): relative_config_path = '.cherry_picker.toml' tmp_git_repo_dir.join(relative_config_path).write('''\ team = "python" repo = "core-workfolow" check_sha = "5f007046b5d4766f971272a0cc99f8461215c1ec" default_branch = "devel" ''') git_add(relative_config_path) git_commit(f'Add {relative_config_path}') scm_revision = get_sha1_from('HEAD') with mock.patch( 'cherry_picker.cherry_picker.get_sha1_from', return_value='', ): cfg = load_config(relative_config_path) assert cfg == ( ':' + relative_config_path, { 'check_sha': '5f007046b5d4766f971272a0cc99f8461215c1ec', 'repo': 'core-workfolow', 'team': 'python', 'fix_commit_msg': True, 'default_branch': 'devel', }, ) def test_normalize_long_commit_message(): commit_message = """[3.6] Fix broken `Show Source` links on documentation pages (GH-3113) The `Show Source` was broken because of a change made in sphinx 1.5.1 In Sphinx 1.4.9, the sourcename was "index.txt". In Sphinx 1.5.1+, it is now "index.rst.txt". (cherry picked from commit b9ff498793611d1c6a9b99df464812931a1e2d69) Co-authored-by: Elmar Ritsch <35851+elritsch@users.noreply.github.com>""" title, body = normalize_commit_message(commit_message) assert title == "[3.6] Fix broken `Show Source` links on documentation pages (GH-3113)" assert body == """The `Show Source` was broken because of a change made in sphinx 1.5.1 In Sphinx 1.4.9, the sourcename was "index.txt". In Sphinx 1.5.1+, it is now "index.rst.txt". (cherry picked from commit b9ff498793611d1c6a9b99df464812931a1e2d69) Co-authored-by: Elmar Ritsch <35851+elritsch@users.noreply.github.com>""" def test_normalize_short_commit_message(): commit_message = """[3.6] Fix broken `Show Source` links on documentation pages (GH-3113) (cherry picked from commit b9ff498793611d1c6a9b99df464812931a1e2d69) Co-authored-by: Elmar Ritsch <35851+elritsch@users.noreply.github.com>""" title, body = normalize_commit_message(commit_message) assert title == "[3.6] Fix broken `Show Source` links on documentation pages (GH-3113)" assert body == """(cherry picked from commit b9ff498793611d1c6a9b99df464812931a1e2d69) Co-authored-by: Elmar Ritsch <35851+elritsch@users.noreply.github.com>""" @pytest.mark.parametrize( 'input_path', ( '/some/path/without/revision', 'HEAD:some/non-existent/path', ), ) def test_from_git_rev_read_negative( input_path, tmp_git_repo_dir, ): with pytest.raises(ValueError): from_git_rev_read(input_path) def test_from_git_rev_read_uncommitted(tmp_git_repo_dir, git_add, git_commit): some_text = 'blah blah 🤖' relative_file_path = '.some.file' tmp_git_repo_dir.join(relative_file_path).write(some_text) git_add('.') with pytest.raises(ValueError): from_git_rev_read('HEAD:' + relative_file_path) == some_text def test_from_git_rev_read(tmp_git_repo_dir, git_add, git_commit): some_text = 'blah blah 🤖' relative_file_path = '.some.file' tmp_git_repo_dir.join(relative_file_path).write(some_text) git_add('.') git_commit('Add some file') assert from_git_rev_read('HEAD:' + relative_file_path) == some_text def test_states(tmp_git_repo_dir): class state_val: name = 'somerandomwords' # First, verify that there's nothing there initially assert get_state() == WORKFLOW_STATES.UNSET # Now, set some val set_state(state_val) with pytest.raises(KeyError, match=state_val.name): get_state() # Wipe it again reset_state() assert get_state() == WORKFLOW_STATES.UNSET def test_paused_flow(tmp_git_repo_dir, git_add, git_commit): assert load_val_from_git_cfg('config_path') is None initial_scm_revision = get_sha1_from('HEAD') relative_file_path = 'some.toml' tmp_git_repo_dir.join(relative_file_path).write(f'''\ check_sha = "{initial_scm_revision}" repo = "core-workfolow" ''') git_add(relative_file_path) git_commit('Add a config') config_scm_revision = get_sha1_from('HEAD') config_path_rev = config_scm_revision + ':' + relative_file_path chosen_config_path, config = load_config(config_path_rev) cherry_picker = CherryPicker( 'origin', config_scm_revision, [], config=config, chosen_config_path=chosen_config_path, ) assert get_state() == WORKFLOW_STATES.UNSET cherry_picker.set_paused_state() assert load_val_from_git_cfg('config_path') == config_path_rev assert get_state() == WORKFLOW_STATES.BACKPORT_PAUSED chosen_config_path, config = load_config(None) assert chosen_config_path == config_path_rev reset_stored_config_ref() assert load_val_from_git_cfg('config_path') is None @pytest.mark.parametrize( 'method_name,start_state,end_state', ( ( 'fetch_upstream', WORKFLOW_STATES.FETCHING_UPSTREAM, WORKFLOW_STATES.FETCHED_UPSTREAM, ), ( 'checkout_default_branch', WORKFLOW_STATES.CHECKING_OUT_DEFAULT_BRANCH, WORKFLOW_STATES.CHECKED_OUT_DEFAULT_BRANCH, ), ), ) def test_start_end_states( method_name, start_state, end_state, tmp_git_repo_dir, ): assert get_state() == WORKFLOW_STATES.UNSET with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker('origin', 'xxx', []) assert get_state() == WORKFLOW_STATES.UNSET def _fetch(cmd): assert get_state() == start_state with mock.patch.object(cherry_picker, 'run_cmd', _fetch): getattr(cherry_picker, method_name)() assert get_state() == end_state def test_cleanup_branch( tmp_git_repo_dir, git_checkout, ): assert get_state() == WORKFLOW_STATES.UNSET with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker('origin', 'xxx', []) assert get_state() == WORKFLOW_STATES.UNSET git_checkout('-b', 'some_branch') cherry_picker.cleanup_branch('some_branch') assert get_state() == WORKFLOW_STATES.REMOVED_BACKPORT_BRANCH def test_cleanup_branch_fail(tmp_git_repo_dir): assert get_state() == WORKFLOW_STATES.UNSET with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker('origin', 'xxx', []) assert get_state() == WORKFLOW_STATES.UNSET cherry_picker.cleanup_branch('some_branch') assert get_state() == WORKFLOW_STATES.REMOVING_BACKPORT_BRANCH_FAILED def test_cherry_pick( tmp_git_repo_dir, git_add, git_branch, git_commit, git_checkout, ): cherry_pick_target_branches = '3.8', pr_remote = 'origin' test_file = 'some.file' tmp_git_repo_dir.join(test_file).write('some contents') git_branch(cherry_pick_target_branches[0]) git_branch( f'{pr_remote}/{cherry_pick_target_branches[0]}', cherry_pick_target_branches[0], ) git_add(test_file) git_commit('Add a test file') scm_revision = get_sha1_from('HEAD') git_checkout( # simulate backport method logic cherry_pick_target_branches[0], ) with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker( pr_remote, scm_revision, cherry_pick_target_branches, ) cherry_picker.cherry_pick() def test_cherry_pick_fail( tmp_git_repo_dir, ): with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker('origin', 'xxx', []) with pytest.raises(CherryPickException, match='^Error cherry-pick xxx.$'): cherry_picker.cherry_pick() def test_get_state_and_verify_fail( tmp_git_repo_dir, ): class tested_state: name = 'invalid_state' set_state(tested_state) expected_msg_regexp = ( fr'^Run state cherry-picker.state={tested_state.name} in Git config ' r'is not known.' '\n' r'Perhaps it has been set by a newer ' r'version of cherry-picker\. Try upgrading\.' '\n' r'Valid states are: ' r'[\w_\s]+(, [\w_\s]+)*\. ' r'If this looks suspicious, raise an issue at ' r'https://github.com/python/core-workflow/issues/new\.' '\n' r'As the last resort you can reset the runtime state ' r'stored in Git config using the following command: ' r'`git config --local --remove-section cherry-picker`' ) with \ mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ), \ pytest.raises(ValueError, match=expected_msg_regexp): cherry_picker = CherryPicker('origin', 'xxx', []) def test_push_to_remote_fail(tmp_git_repo_dir): with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker('origin', 'xxx', []) cherry_picker.push_to_remote('master', 'backport-branch-test') assert get_state() == WORKFLOW_STATES.PUSHING_TO_REMOTE_FAILED def test_push_to_remote_interactive(tmp_git_repo_dir): with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker('origin', 'xxx', []) with \ mock.patch.object(cherry_picker, 'run_cmd'), \ mock.patch.object(cherry_picker, 'open_pr'), \ mock.patch.object( cherry_picker, 'get_pr_url', return_value='https://pr_url', ): cherry_picker.push_to_remote('master', 'backport-branch-test') assert get_state() == WORKFLOW_STATES.PR_OPENING def test_push_to_remote_botflow(tmp_git_repo_dir, monkeypatch): monkeypatch.setenv('GH_AUTH', 'True') with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker('origin', 'xxx', []) with \ mock.patch.object(cherry_picker, 'run_cmd'), \ mock.patch.object(cherry_picker, 'create_gh_pr'): cherry_picker.push_to_remote('master', 'backport-branch-test') assert get_state() == WORKFLOW_STATES.PR_CREATING def test_backport_no_branch(tmp_git_repo_dir, monkeypatch): with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker('origin', 'xxx', []) with pytest.raises( click.UsageError, match='^At least one branch must be specified.$', ): cherry_picker.backport() def test_backport_cherry_pick_fail( tmp_git_repo_dir, git_branch, git_add, git_commit, git_checkout, ): cherry_pick_target_branches = '3.8', pr_remote = 'origin' test_file = 'some.file' tmp_git_repo_dir.join(test_file).write('some contents') git_branch(cherry_pick_target_branches[0]) git_branch( f'{pr_remote}/{cherry_pick_target_branches[0]}', cherry_pick_target_branches[0], ) git_add(test_file) git_commit('Add a test file') scm_revision = get_sha1_from('HEAD') git_checkout( # simulate backport method logic cherry_pick_target_branches[0], ) with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker( pr_remote, scm_revision, cherry_pick_target_branches, ) with \ pytest.raises(CherryPickException), \ mock.patch.object(cherry_picker, 'checkout_branch'), \ mock.patch.object(cherry_picker, 'fetch_upstream'), \ mock.patch.object( cherry_picker, 'cherry_pick', side_effect=CherryPickException, ): cherry_picker.backport() assert get_state() == WORKFLOW_STATES.BACKPORT_PAUSED def test_backport_cherry_pick_crash_ignored( tmp_git_repo_dir, git_branch, git_add, git_commit, git_checkout, ): cherry_pick_target_branches = '3.8', pr_remote = 'origin' test_file = 'some.file' tmp_git_repo_dir.join(test_file).write('some contents') git_branch(cherry_pick_target_branches[0]) git_branch( f'{pr_remote}/{cherry_pick_target_branches[0]}', cherry_pick_target_branches[0], ) git_add(test_file) git_commit('Add a test file') scm_revision = get_sha1_from('HEAD') git_checkout( # simulate backport method logic cherry_pick_target_branches[0], ) with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker( pr_remote, scm_revision, cherry_pick_target_branches, ) with \ mock.patch.object(cherry_picker, 'checkout_branch'), \ mock.patch.object(cherry_picker, 'fetch_upstream'), \ mock.patch.object(cherry_picker, 'cherry_pick'), \ mock.patch.object( cherry_picker, 'amend_commit_message', side_effect=subprocess.CalledProcessError( 1, ( 'git', 'commit', '-am', 'new commit message', ), ) ): cherry_picker.backport() assert get_state() == WORKFLOW_STATES.BACKPORT_COMPLETE def test_backport_success( tmp_git_repo_dir, git_branch, git_add, git_commit, git_checkout, ): cherry_pick_target_branches = '3.8', pr_remote = 'origin' test_file = 'some.file' tmp_git_repo_dir.join(test_file).write('some contents') git_branch(cherry_pick_target_branches[0]) git_branch( f'{pr_remote}/{cherry_pick_target_branches[0]}', cherry_pick_target_branches[0], ) git_add(test_file) git_commit('Add a test file') scm_revision = get_sha1_from('HEAD') git_checkout( # simulate backport method logic cherry_pick_target_branches[0], ) with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker( pr_remote, scm_revision, cherry_pick_target_branches, ) with \ mock.patch.object(cherry_picker, 'checkout_branch'), \ mock.patch.object(cherry_picker, 'fetch_upstream'), \ mock.patch.object(cherry_picker, 'amend_commit_message', return_value='commit message'): cherry_picker.backport() assert get_state() == WORKFLOW_STATES.BACKPORT_COMPLETE def test_backport_pause_and_continue( tmp_git_repo_dir, git_branch, git_add, git_commit, git_checkout, ): cherry_pick_target_branches = '3.8', pr_remote = 'origin' test_file = 'some.file' tmp_git_repo_dir.join(test_file).write('some contents') git_branch(cherry_pick_target_branches[0]) git_branch( f'{pr_remote}/{cherry_pick_target_branches[0]}', cherry_pick_target_branches[0], ) git_add(test_file) git_commit('Add a test file') scm_revision = get_sha1_from('HEAD') git_checkout( # simulate backport method logic cherry_pick_target_branches[0], ) with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker( pr_remote, scm_revision, cherry_pick_target_branches, push=False, ) with \ mock.patch.object(cherry_picker, 'checkout_branch'), \ mock.patch.object(cherry_picker, 'fetch_upstream'), \ mock.patch.object(cherry_picker, 'amend_commit_message', return_value='commit message'): cherry_picker.backport() assert get_state() == WORKFLOW_STATES.BACKPORT_PAUSED cherry_picker.initial_state = get_state() with \ mock.patch( 'cherry_picker.cherry_picker.wipe_cfg_vals_from_git_cfg', ), \ mock.patch( 'cherry_picker.cherry_picker.get_full_sha_from_short', return_value='xxxxxxyyyyyy', ), \ mock.patch( 'cherry_picker.cherry_picker.get_base_branch', return_value='3.8', ), \ mock.patch( 'cherry_picker.cherry_picker.get_current_branch', return_value='backport-xxx-3.8', ), \ mock.patch( 'cherry_picker.cherry_picker.get_author_info_from_short_sha', return_value='Author Name ', ), \ mock.patch.object(cherry_picker, 'get_commit_message', return_value='commit message'), \ mock.patch.object(cherry_picker, 'checkout_branch'), \ mock.patch.object(cherry_picker, 'fetch_upstream'): cherry_picker.continue_cherry_pick() assert get_state() == WORKFLOW_STATES.BACKPORTING_CONTINUATION_SUCCEED def test_continue_cherry_pick_invalid_state(tmp_git_repo_dir): assert get_state() == WORKFLOW_STATES.UNSET with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker('origin', 'xxx', []) assert get_state() == WORKFLOW_STATES.UNSET with pytest.raises( ValueError, match=r'^One can only continue a paused process.$', ): cherry_picker.continue_cherry_pick() assert get_state() == WORKFLOW_STATES.UNSET # success def test_continue_cherry_pick_invalid_branch(tmp_git_repo_dir): set_state(WORKFLOW_STATES.BACKPORT_PAUSED) with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker('origin', 'xxx', []) with mock.patch('cherry_picker.cherry_picker.wipe_cfg_vals_from_git_cfg'): cherry_picker.continue_cherry_pick() assert get_state() == WORKFLOW_STATES.CONTINUATION_FAILED def test_abort_cherry_pick_invalid_state(tmp_git_repo_dir): assert get_state() == WORKFLOW_STATES.UNSET with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker('origin', 'xxx', []) assert get_state() == WORKFLOW_STATES.UNSET with pytest.raises( ValueError, match=r'^One can only abort a paused process.$', ): cherry_picker.abort_cherry_pick() def test_abort_cherry_pick_fail(tmp_git_repo_dir): set_state(WORKFLOW_STATES.BACKPORT_PAUSED) with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker('origin', 'xxx', []) with mock.patch('cherry_picker.cherry_picker.wipe_cfg_vals_from_git_cfg'): cherry_picker.abort_cherry_pick() assert get_state() == WORKFLOW_STATES.ABORTING_FAILED def test_abort_cherry_pick_success( tmp_git_repo_dir, git_branch, git_add, git_commit, git_checkout, git_cherry_pick, ): cherry_pick_target_branches = '3.8', pr_remote = 'origin' test_file = 'some.file' git_branch( f'backport-xxx-{cherry_pick_target_branches[0]}', ) tmp_git_repo_dir.join(test_file).write('some contents') git_add(test_file) git_commit('Add a test file') scm_revision = get_sha1_from('HEAD') git_checkout( f'backport-xxx-{cherry_pick_target_branches[0]}', ) tmp_git_repo_dir.join(test_file).write('some other contents') git_add(test_file) git_commit('Add a test file again') try: git_cherry_pick( # simulate a conflict with pause scm_revision, ) except subprocess.CalledProcessError: pass set_state(WORKFLOW_STATES.BACKPORT_PAUSED) with mock.patch( 'cherry_picker.cherry_picker.validate_sha', return_value=True, ): cherry_picker = CherryPicker( pr_remote, scm_revision, cherry_pick_target_branches, ) with mock.patch('cherry_picker.cherry_picker.wipe_cfg_vals_from_git_cfg'): cherry_picker.abort_cherry_pick() assert get_state() == WORKFLOW_STATES.REMOVED_BACKPORT_BRANCH PK!HMj/M.cherry_picker-1.3.0.dist-info/entry_points.txtN+I/N.,()JH-*/LN-Ex9\\PK!HPO#cherry_picker-1.3.0.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!HZ6&cherry_picker-1.3.0.dist-info/METADATA[rȕxg$W HRɚ8) 4IAAJ<~t7.$[pfl>}מ,kU8K(zJpb}gUxazJCJ(\f)tJ.U! TLC%fZ*xﲕsq˲`e5 l5 dy B;]T٧qšx/XZN!03o~=3Sa;.⵺VI#%\_XxwU\(P^0^+q^IF<*u2Ju[nBI"1cRdBVq߁'k wHE⸊bE8"V>%.>fF viCX`@q@K6,hH/ Tԛll>G?_xTHx'sGX-=?xykb6k~Y FY~ =O~^Ze߾?GzkRf<^@˥89~|RrC2N+J/,ۏ4oWz)Gⅳ#C6V( ?УRl:/_؀(VP(wh&??leܷH~.6HL=7ׇvĸgz:%lehl*a MT`x n83bur[V+gYLhqMl@Jڃ\kPMğ 9r;bƶm-+u$" 0q gc#`Ye `A kD=z HUF8Lbsjh>ax>}YITX! yZc>+lU4z+h˼O8ZE1/OgqGX!AO"Z(DKYC8q?ZwƝ`X4X(z[%yƭNm_ӟXbr艞41S@ۤU:SvwS(fP8xC!'| ?Ӌ޳lHO? 1.Npčއ[#X9|9tG>l)ۂem.Ö {0?88Psh|l?Tl՛ .=&.THdߘ;]NcMeLЇi"= &D] \0SL"}AȪ4"R!m'[ȹM2MĄQp aL7)#;t?0teR q5%at\7-{ɧ6M5\^sjE[o1G^nti=o9N=EPp[,iUGjM)˥ꯇ!y39 Z;shw,qr=Pp ԁ:1iDc^ЩTx*GQDD1T{#9}9<`ww4~CG؇ ZBl>L$֌75AYiYuiw-lR:lo}-[@sz}>̠P{mĞ àڦѩ9*"4D}'^`Sqv8M~ /q##$8$8oD (kw;sa-T ";r)EV-咇_nm(58Whۊxj)mWniuk!ue ב?'pz/?Q(j-GnP˳qֽ"?lSyԝ*|mص&-RzGZ8_o{'N+Oڤ3j@F*)17wA^a)QRzWVåUh\w/9߼C j-K(/2_p-ژ~Ph-bB.P5fzp,AqzmnJy֭lvܣLU7mS$ߌdI쀲SQCJ-t=銯I{GII\VGtn`n 𸢓pTƉP@0B6 g n\YK7;N&edb ūj1o'=9Y{[2r_LvIc=nB[铥5EU!hLnKa&VhL#] i2KxsͣvKۼ܌#94RH \,]x/w9b{ &\j;eMȦ}+s}k3V"+@η7`cʬ}owOQ@pH NT>N}\S.=: Vz"R sW*p Bcךe ;Q%tX ~ژ9,^HI6,9]2[:.Vw|"Ip3~ KMڎ#{sNO䍓 33SQ^h5Ve(.Lb5J3Uy'& { PKUN rZZcherry_picker/__init__.pyPKUNh7$]]cherry_picker/__main__.pyPKUNgg%cherry_picker/cherry_picker.pyPKUN ||icherry_picker/test.pyPK!HMj/M.cherry_picker-1.3.0.dist-info/entry_points.txtPK!HPO#cherry_picker-1.3.0.dist-info/WHEELPK!HZ6&#cherry_picker-1.3.0.dist-info/METADATAPK!H8Ӏ$Dcherry_picker-1.3.0.dist-info/RECORDPKp