PK!2 |<<github_macros/__init__.pyfrom __future__ import print_function __version__ = '2.0.0' PK!Uv&&github_macros/cli/__init__.pyfrom __future__ import print_function PK!\X:]]]github_macros/cli/_base.pyimport argparse import sys from github_macros.http import GithubHttp class MyParser(argparse.ArgumentParser): def error(self, message): sys.stderr.write('ERROR: %s\n\n' % message) self.print_help() sys.exit(2) def create_client(username, token): if not username: raise KeyError('Requires Github username to be given via GITHUB_USER variable or command line flag') if not token: raise KeyError('Requires Github personal access token to be given via GITHUB_TOKEN variable or command line flag') return GithubHttp(username=username, token=token) PK!0(/(/&github_macros/cli/branch_protection.pyimport os import sys from github_macros.cli._base import MyParser, create_client from github_macros.models.github import GithubOrganization, GithubUser, GithubRepository from github_macros import __version__ def get_args(): defaults = {} p = MyParser() p.add_argument('--version', '-v', action='version', help='Prints the program version and exits', version='%(prog)s ' + __version__) p.add_argument('--branch', '-b', dest='branches', action='append', default=[], required=True, help='Branch name in which to check branch protections') mapping = p.add_argument_group('Repository listing', 'Use these options for choosing ' 'how to select your list of repositories to check against ' 'for branch protection settings') mapping.add_argument('--repository', '-r', dest='repositories', action='append', default=[], help='GitHub repository in which to check branch protections') mapping.add_argument('--user', '-u', dest='users', action='append', default=[], help='GitHub user in which to check branch protections for all repositories') mapping.add_argument('--organization', '-o', dest='organizations', action='append', default=[], help='GitHub organization in which to update hooks for all repositories') p.add_argument('--github-user', dest='gh_user', action='store', default=os.getenv('GITHUB_USER')) p.add_argument('--github-token', dest='gh_token', action='store', default=os.getenv('GITHUB_TOKEN')) # CHECKS: status_checks = p.add_argument_group('Commit status checks') status_checks.add_argument('--ci-check', dest='contexts', action='append', default=[], help='Required checks (e.g., from Jenkins or other CI) in order to allow merging a PR') code_review = p.add_argument_group('Code review process') defaults['require_code_review'] = None # If `is None`, we don't care about this setting code_review_opt = code_review.add_mutually_exclusive_group() code_review_opt.add_argument('--code-review', dest='require_code_review', action='store_true', help='Code review is necessary before merging') code_review_opt.add_argument('--no-code-review', dest='require_code_review', action='store_false', help='Code review is extra credit') defaults['auto_dismiss_review'] = None # If `is None`, we don't care about this setting code_review_autodismiss = code_review.add_mutually_exclusive_group() code_review_autodismiss.add_argument('--auto-dismiss-review', dest='auto_dismiss_review', action='store_true', help='Pushing changes after receiving a review should automatically ' 'require re-review') code_review_autodismiss.add_argument('--no-auto-dismiss-review', dest='auto_dismiss_review', action='store_false', help='Pushing changes after receiving a review should not affect anything') defaults['restrict_dismiss_reviews'] = None # If `is None`, we don't care about this setting code_review_dismiss = code_review.add_mutually_exclusive_group() code_review_dismiss.add_argument('--restrict-dismiss-review', dest='restrict_dismiss_review', action='store_true', help='No one can dismiss a code review except the one who gave it (or specified by ' '--dismiss-user or --dismiss-team)') code_review_dismiss.add_argument('--no-restrict-dismiss-review', dest='restrict_dismiss_review', action='store_false', help='Anyone with write permissions to the repository can dismiss a code review') code_review.add_argument('--dismiss-user', dest='dismiss_review_users', action='append', default=[], help='Ignore review dismissal restrictions for this user (allows multiple invocations of --dismiss-user)') code_review.add_argument('--dismiss-team', dest='dismiss_review_teams', action='append', default=[], help='Ignore review dismissal restrictions for this team (allows multiple invocations of --dismiss-team)') defaults['except_admins'] = None # If `is None`, we don't care about this setting admins = p.add_argument_group('Administrators should follow the rules too') admins_opts = admins.add_mutually_exclusive_group() admins_opts.add_argument('--enforce-for-admins', dest='except_admins', action='store_false', help='Force repository/organization admins to also comply with branch protections') admins_opts.add_argument('--no-enforce-for-admins', dest='except_admins', action='store_true', help='Allow repository/organization admins to override branch protections') uptodate = p.add_argument_group('Branch commits relative to upstream') defaults['branch_up_to_date'] = None # If `is None`, we don't care about this setting uptodate_opt = uptodate.add_mutually_exclusive_group() uptodate_opt.add_argument('--branch-up-to-date', dest='branch_up_to_date', action='store_true', help='Require (as a status check) that the branch being merged must be ' 'up-to-date with the upstream branch') uptodate_opt.add_argument('--no-branch-up-to-date', dest='branch_up_to_date', action='store_false', help='Ignore if the branch being merged is not up-to-date with the upstream branch') # TODO: add a nice description of the rule this covers push = p.add_argument_group('Pushing commits directly to the branch') defaults['restrict_push'] = None # If `is None`, we don't care about this setting push_opt = push.add_mutually_exclusive_group() push_opt.add_argument('--restrict-push', dest='restrict_push', action='store_true', help='Disallow pushing to the specified branch (exceptions noted with --push-user ' 'and --push-team)') push_opt.add_argument('--no-restrict-push', dest='restrict_push', action='store_false', help='Allow pushing to the specified branch by those with write access') push.add_argument('--push-user', dest='allowed_push_users', action='append', default=[], help='Ignore push restrictions for this user (allows multiple invocations of --push-user)') push.add_argument('--push-team', dest='allowed_push_teams', action='append', default=[], help='Ignore push restrictions for this team (allows multiple invocations of --push-team)') p.set_defaults(**defaults) return p.parse_args() def index_repos(client, repo_names=None, org_names=None, usernames=None): repositories = [] for org_name in (org_names if org_names else []): # org.repositories is a lazy-loaded item, so we don't need to fetch all the info on the org org = GithubOrganization(client, org_name) repositories += org.repositories for username in (usernames if usernames else []): # user.repositories is a lazy-loaded item, so we don't need to fetch all the info on the person user = GithubUser(client, username) repositories += user.repositories for repo_name in (repo_names if repo_names else []): repositories.append(GithubRepository.fetch(client, repo_name)) return set(repositories) def error(repo, branch, option_name): msg = "ERROR:: {repo} @ {branch} => {opt}\n" sys.stderr.write(msg.format(repo=repo.full_name, branch=branch.name, opt=option_name)) def main(): opt = get_args() client = create_client(username=opt.gh_user, token=opt.gh_token) # collecting repo objects for all the things repositories = index_repos(client=client, repo_names=opt.repositories, org_names=opt.organizations, usernames=opt.users) all_errors = 0 for repo in repositories: # print('REPO: {name}'.format(name=str(repo.full_name))) repo.refresh() repo_errors = 0 for branch in repo.branches: if branch.name not in opt.branches: continue errors = 0 if not branch.protection.enabled: error(repo, branch, 'Branch protection is disabled') errors += repo_push_checks(branch.protection, opt) errors += repo_code_review(branch.protection, opt) errors += repo_status_checks(branch.protection, opt) errors += repo_admin_exemptions(branch.protection, opt) if errors > 0: fix_url = '{base}/settings/branches/{branch}'.format(base=repo.url, branch=branch.name) msg = '===============> Fix it: {url} <===============' sys.stderr.write(msg.format(url=fix_url) + '\n\n') repo_errors += errors all_errors += repo_errors sys.exit(0 if all_errors == 0 else 1) # RULE CHECKERS def repo_push_checks(protection, opt): errors = 0 branch = protection.branch repo = branch.repository if opt.restrict_push is None: return errors if protection.push_restrictions != opt.restrict_push: if protection.push_restrictions: error(repo, branch, 'Push restrictions are enabled') else: error(repo, branch, 'Push restrictions are disabled') errors += 1 if not opt.restrict_push: return errors # TODO: Check list of users where we're overriding push restrictions # TODO: Check list of teams where we're overriding push restrictions return errors def repo_code_review(protection, opt): errors = 0 branch = protection.branch repo = branch.repository if opt.require_code_review is None: return errors if protection.required_code_review != opt.require_code_review: if protection.required_code_review: error(repo, branch, 'Mandatory code review is enabled') else: error(repo, branch, 'Mandatory code review is disabled') errors += 1 if not opt.require_code_review: return errors if opt.auto_dismiss_review is not None and \ protection.dismiss_stale_reviews != opt.auto_dismiss_review: if protection.dismiss_stale_reviews: error(repo, branch, 'Reviews are automatically dismissed when new code is pushed') else: error(repo, branch, 'Reviews remain valid when new code is pushed') errors += 1 return errors def repo_status_checks(protection, opt): errors = 0 branch = protection.branch repo = branch.repository if opt.branch_up_to_date is not None: if protection.up_to_date != opt.branch_up_to_date: if protection.up_to_date: error(repo, branch, 'Branch must be up-to-date with upstream') else: error(repo, branch, 'Branch can be, but is not mandated to be, up-to-date with upstream') errors += 1 for check in opt.contexts: if check not in protection.contexts: error(repo, branch, 'Branch is missing the "{}" check'.format(check)) errors += 1 return errors def repo_admin_exemptions(protection, opt): errors = 0 branch = protection.branch repo = branch.repository if opt.except_admins is not None: if opt.except_admins != protection.except_admins: if protection.except_admins: error(repo, branch, 'Administrators are exempt from branch protections') else: error(repo, branch, 'Administrators must also follow branch protections') errors += 1 return errors if __name__ == '__main__': main() PK!>-github_macros/cli/refresh.pyimport os from github_macros.cli._base import MyParser, create_client from github_macros.models.github import GithubOrganization, GithubUser from github_macros import __version__ from sh.contrib import git as git_cmd import sh def git(*args, **kwargs): exc = None for attempt in range(3): try: return git_cmd(*args, **kwargs) except sh.ErrorReturnCode_128 as e: exc = e continue if exc: raise exc def git_ref_resolve(repo, ref): path = os.path.join(repo.owner.name, repo.name) try: return git('-C', path, 'rev-parse', '--abbrev-ref', ref).strip() except sh.ErrorReturnCode: return None def clone(repo, fake=False, clobber=False): """ """ path = os.path.join(repo.owner.name, repo.name) if os.path.exists(path): print(' REPO: Updating {repo}'.format(repo=repo.full_name)) if fake: return git('-C', path, 'fetch', 'origin') if not clobber: return branch_name = git_ref_resolve(repo, 'HEAD') if branch_name == 'master': git('-C', path, 'reset', '--hard', 'origin/master') elif branch_name: git('-C', path, 'branch', '-f', 'master', 'origin/master') elif git_ref_resolve(repo, 'origin/master'): git('-C', path, 'pull', 'origin', 'master') else: os.makedirs(path) print(' REPO: Cloning {repo}'.format(repo=repo.full_name)) if fake: return git('clone', repo.clone_url, path) def get_args(): p = MyParser() p.add_argument('--version', '-v', action='version', help='Prints the program version and exits', version='%(prog)s ' + __version__) p.add_argument('--user', '-u', dest='users', action='append', default=[os.getenv('GITHUB_USER')] if os.getenv('GITHUB_USER') else [], help='GitHub user from which to clone/update all repositories') p.add_argument('--organization', '-o', dest='organizations', action='append', default=[], help='GitHub organization from which to clone/update all repositories') p.add_argument('--base-dir', dest='base_directory', action='store', default=os.getcwd(), help='The directory where repositories will be cloned in a Github-like directory structure') p.add_argument('--github-user', dest='gh_user', action='store', default=os.getenv('GITHUB_USER')) p.add_argument('--github-token', dest='gh_token', action='store', default=os.getenv('GITHUB_TOKEN')) p.add_argument('--dry-run', dest='dry_run', action='store_true', default=False) p.add_argument('--clobber', '-F', dest='clobber', action='store_true', default=False, help='Overwrite existing working copy for each repository') return p.parse_args() def main(): opts = get_args() os.chdir(opts.base_directory) client = create_client(username=opts.gh_user, token=opts.gh_token) for org_name in set(opts.organizations): # org.repositories is a lazy-loaded item, so we don't need to fetch all the info on the org org = GithubOrganization(client, org_name) managed_directories = set([]) print(' ORG: {org}'.format(org=org.name)) for repo in org.repositories: managed_directories.add(os.path.join(repo.owner.name, repo.name)) clone(repo, fake=opts.dry_run, clobber=opts.clobber) # list directories in {org.name} directory for directory_name in os.listdir(org.name): full_path = os.path.join(org.name, directory_name) if not os.path.isdir(full_path): continue if full_path in managed_directories: continue print('EXTRA: {directory}'.format(directory=full_path)) for username in set(opts.users): # user.repositories is a lazy-loaded item, so we don't need to fetch all the info on the person user = GithubUser(client, username) managed_directories = set([]) print(' USER: {user}'.format(user=user.name)) for repo in user.repositories: managed_directories.add(os.path.join(repo.owner.name, repo.name)) clone(repo, fake=opts.dry_run, clobber=opts.clobber) # list directories in {user.name} directory for directory_name in os.listdir(user.name): full_path = os.path.join(user.name, directory_name) if not os.path.isdir(full_path): continue if full_path in managed_directories: continue print('EXTRA: {directory}'.format(directory=full_path)) if __name__ == '__main__': main() PK!1KP P github_macros/cli/releases.pyimport os import re from github_macros.cli._base import MyParser, create_client from github_macros import __version__ def gh_client(): return create_client(os.getenv('GITHUB_USER'), os.getenv('GITHUB_TOKEN')) def get_args(): p = MyParser() p.add_argument('--version', '-v', action='version', help='Prints the program version and exits', version='%(prog)s ' + __version__) p.add_argument('--github-user', dest='gh_user', action='store', default=os.getenv('GITHUB_USER')) p.add_argument('--github-token', dest='gh_token', action='store', default=os.getenv('GITHUB_TOKEN')) p.add_argument('--prefix', dest='pfx', action='store', default='v', help='Version prefix applied to semver tags') p.add_argument('--pattern', dest='version_pattern', action='store', default='', help='A python-based regular expression for what pattern indicates a version tag') p.add_argument('--latest', dest='latest', action='store_true', default=False, help='Grab only the latest release') p.add_argument('repo', metavar='REPO', action='store', help='The target repository for which to find the latest version (e.g., "postmodern/chruby")') return p.parse_args() def get_releases(repo, client, pattern=r'^v\d+\.\d+\.\d+'): response = client.get('/repos/{}/releases'.format(repo), params={'per_page': 999}) response.raise_for_status() version_pattern = re.compile(pattern) # Only version tags unstable_version_pattern = re.compile(r'\d-?(rc\d*|a(lpha)?\d*|b(eta)?\d*)$') def is_final_release(release): if release['draft']: return False if release['prerelease']: return False if unstable_version_pattern.search(release['tag_name']): return False return True def is_version_tag(release): return bool(version_pattern.search(release['tag_name'])) return sorted( filter( lambda r: is_version_tag(r) and is_final_release(r), response.json() or [], ), key=lambda release: release['tag_name'], ) def show_release_info(release, client): response = client.get(release['assets_url'], params={'per_page': 999}) response.raise_for_status() assets = response.json() for asset in assets: print('{tag}\t{name}\t{url}'.format(tag=release['tag_name'], name=asset['name'], url=asset['browser_download_url'])) def main(): opts = get_args() client = create_client(username=opts.gh_user, token=opts.gh_token) if opts.version_pattern: version_pattern = opts.version_pattern else: version_pattern = '^{}'.format(opts.pfx) + r'\d+\.\d+\.\d+' if opts.latest: releases = get_releases(opts.repo, client, pattern=version_pattern) if len(releases) > 0: show_release_info(releases[-1], client) else: for release in get_releases(opts.repo, client, pattern=version_pattern): show_release_info(release, client) if __name__ == '__main__': main() PK!n@ %github_macros/cli/repo_permissions.pyimport os from github_macros.cli._base import MyParser, create_client from github_macros.models.github import GithubOrganization from github_macros import __version__ def get_args(): p = MyParser() p.add_argument('--version', '-v', action='version', help='Prints the program version and exits', version='%(prog)s ' + __version__) p.add_argument('--organization', '-o', dest='organization', action='store', default=None, required=True, help='GitHub organization whose repositories we will alter') p.add_argument('--team', '-t', dest='team', action='store', default=None, required=True, help='GitHub team slug (scoped to the given organization) for which to provide permissions') p.add_argument('--permission', '-p', dest='permission', action='store', default='write', choices=['read', 'write', 'admin'], help='GitHub repository permissions to grant the given team') p.add_argument('--github-user', dest='gh_user', action='store', default=os.getenv('GITHUB_USER')) p.add_argument('--github-token', dest='gh_token', action='store', default=os.getenv('GITHUB_TOKEN')) return p.parse_args() def team_from_name(client, org_name, team): resp = client.get('/orgs/{org}/teams'.format(org=org_name)) resp.raise_for_status() out = resp.json() for team_info in out: if team_info['slug'].lower() != team.lower(): continue return team_info raise 'Team name {team} not found for organization {org}'.format(team=repr(team), org=repr(org_name)) def main(): opts = get_args() client = create_client(username=opts.gh_user, token=opts.gh_token) client.headers.update({'Accept': 'application/vnd.github.swamp-thing-preview+json'}) perm = { 'read': 'pull', 'write': 'push', 'admin': 'admin', }[opts.permission] org_name = opts.organization assert org_name, 'Must provide a valid organization name in slug format' team = team_from_name(client, org_name, opts.team) assert team, 'Must provide a valid team name for the given organization' print('TEAM: @{org}/{team} ({_id})'.format(org=org_name, team=opts.team, _id=team['id'])) org = GithubOrganization(client, org_name) for repo in org.repositories: resp = client.put( '/teams/{team_id}/repos/{repo}'.format(repo=repo.full_name, team_id=team['id']), json={ 'permission': perm, }, ) resp.raise_for_status() # NOTE: Normally a status of 201 would indicate it was written to the server, but our GHE instance is buggy that way. print('REPO: {repo}'.format(repo=repo.full_name)) PK!u  github_macros/common.py_missing = object() # @see https://stackoverflow.com/a/17487613/1236035 class cached_property(object): # pragma: no cover """A decorator that converts a function into a lazy property. The function wrapped is called the first time to retrieve the result and then that calculated result is used the next time you access the value:: class Foo(object): @cached_property def foo(self): # calculate something important here return 42 The class has to have a `__dict__` in order for this property to work. """ # implementation detail: this property is implemented as non-data # descriptor. non-data descriptors are only invoked if there is no # entry with the same name in the instance's __dict__. this allows # us to completely get rid of the access function call overhead. If # one chooses to invoke __get__ by hand the property will still work # as expected because the lookup logic is replicated in __get__ for # manual invocation. def __init__(self, func, name=None, doc=None): self.__name__ = name or func.__name__ self.__module__ = func.__module__ self.__doc__ = doc or func.__doc__ self.func = func def __get__(self, obj, _type=None): if obj is None: return self value = obj.__dict__.get(self.__name__, _missing) if value is _missing: value = self.func(obj) obj.__dict__[self.__name__] = value return value PK!~,9gggithub_macros/http.pyfrom __future__ import print_function import os import requests class GithubHttp(requests.Session): """ Wrapper for a requests session object with our project-specific settings """ def __init__(self, username, token, *args, **kwargs): super(GithubHttp, self).__init__(*args, **kwargs) if os.getenv('GITHUB_DOMAIN', 'github.com') in ('api.github.com', 'github.com'): self.base_uri = 'https://api.github.com' else: self.base_uri = 'https://{domain}/api/v3'.format(domain=os.getenv('GITHUB_DOMAIN')) self.auth = (username, token) self.headers.update({'Accept': 'application/vnd.github.loki-preview+json', 'Content-Type': 'application/json', 'User-Agent': 'David Alexander: "Too lazy... Just script it..."'}) def prepare_request(self, request, **kwargs): if request.url.startswith('/'): # Insert our github.com api string as the base request.url = self.base_uri + request.url return super(GithubHttp, self).prepare_request(request, **kwargs) PK!Uv&& github_macros/models/__init__.pyfrom __future__ import print_function PK!dPcPPgithub_macros/models/github.py""" A read-only set of models in the active record pattern, reading from GitHub's API and transforming it into a pythonic set of objects. """ from github_macros.common import cached_property import dateutil.parser def parse_iso8601(stamp): return dateutil.parser.parse(stamp) def debug(msg): # print("=================> DEBUG: {}".format(msg)) pass class BaseGithubSerializer(object): ALLOWED_MAPS = [] # To be overridden in subclasses http = None name = None created_at = None # Datetime updated_at = None # Datetime def __init__(self, client, name, **kwargs): """ param:: client: An instance of `github_macros.http.GithubHttp` param:: name: The organization slug of the target """ self.http = client self.name = name self._set_props(**kwargs) def _set_props(self, **kwargs): for attr in self.ALLOWED_MAPS: if attr in kwargs: setattr(self, attr, kwargs[attr]) # Datetime serialization: for stamp_attr in ['created_at', 'updated_at', 'pushed_at']: if stamp_attr in kwargs: setattr(self, stamp_attr, parse_iso8601(kwargs[stamp_attr])) def refresh(self): # OVERRIDE raise NotImplementedError() @classmethod def fetch(cls, client, name): """ Easy, user-facing way to retrieve a record from the REST API """ out = cls(client, name) out.refresh() return out @classmethod def deserialize(cls, client, obj): """ Deserializes Github's JSON payload into a new instance of self """ name = obj.get('name') if 'name' in obj: del obj['name'] return cls(client=client, name=name, **obj) def serialize(self): """ Serializes self into JSON compatible with Github's API """ raise NotImplementedError() def __hash__(self): return hash(self.name) def __eq__(self, other): if '__hash__' not in dir(other): # All subclasses of this have a definition for `__hash__()` return False if not isinstance(other, type(self)) and not isinstance(self, type(other)): # one is not an instance (or sub-class) of the other return False return self.__hash__() == other.__hash__() class GithubOrganization(BaseGithubSerializer): ALLOWED_MAPS = ['login', 'id', 'company', 'blog', 'location', 'email', 'has_organization_projects', 'has_repository_projects', 'public_repos', 'public_gists', 'followers', 'following', 'total_private_repos', 'owned_private_repos', 'private_gists', 'collaborators', 'description', 'default_repository_permission', 'members_can_create_repositories', 'billing_email'] http = None name = None id = None login = None display_name = None description = None company = None blog = None location = None email = None billing_email = None has_organization_projects = False has_repository_projects = False public_repos = 0 public_gists = 0 followers = 0 following = 0 total_private_repos = 0 owned_private_repos = 0 private_gists = 0 collaborators = 0 default_repository_permission = 'read' members_can_create_repositories = False created_at = None # Datetime updated_at = None # Datetime def _set_props(self, **kwargs): if 'login' in kwargs: self.name = kwargs['login'] if 'name' in kwargs: self.display_name = kwargs['name'] super(GithubOrganization, self)._set_props(**kwargs) def refresh(self): resp = self.http.get('/orgs/{org}'.format(org=self.name), params={'per_page': 999}) resp.raise_for_status() out = resp.json() self._set_props(**out) # clear cache to re-fetch sub-resources if 'repositories' in self.__dict__: del self.__dict__['repositories'] if 'members' in self.__dict__: del self.__dict__['members'] @cached_property def repositories(self): resp = self.http.get('/orgs/{org}/repos'.format(org=self.name), params={'per_page': 999}) if resp.status_code == 404: return [] resp.raise_for_status() out = resp.json() return [GithubRepository.deserialize(self.http, repo) for repo in out] @cached_property def members(self): resp = self.http.get('/orgs/{org}/members'.format(org=self.name), params={'per_page': 999}) if resp.status_code == 404: return [] resp.raise_for_status() out = resp.json() return [GithubUser.deserialize(self.http, user) for user in out] def __str__(self): return 'Github Organization ({o})'.format(o=str(self.name)) class GithubUser(BaseGithubSerializer): # 1-to-1 mappings between JSON and object attributes: ALLOWED_MAPS = ['id', 'login', 'site_admin', 'company', 'email', 'location', 'bio'] http = None name = None id = None login = None display_name = None site_admin = False company = None email = None location = None bio = None def refresh(self): resp = self.http.get('/users/{u}'.format(u=self.name), params={'per_page': 999}) resp.raise_for_status() out = resp.json() self._set_props(**out) # clear cache to re-fetch sub-resources if 'repositories' in self.__dict__: del self.__dict__['repositories'] def _set_props(self, **kwargs): # Weird mappings to be consistent with other models, where `name` is the unique slug, # but here `name` is the person's first and last name (e.g., "David Alexander" not the # username) if 'login' in kwargs: self.name = kwargs['login'] if 'name' in kwargs: self.display_name = kwargs['name'] super(GithubUser, self)._set_props(**kwargs) @cached_property def repositories(self): resp = self.http.get('/users/{u}/repos'.format(u=self.name), params={'per_page': 999}) if resp.status_code == 404: return [] resp.raise_for_status() out = resp.json() return [GithubRepository.deserialize(self.http, repo) for repo in out] def __str__(self): return 'Github User ({u})'.format(u=str(self.name)) class GithubRepository(BaseGithubSerializer): # 1-to-1 mappings between JSON and object attributes: ALLOWED_MAPS = ['homepage', 'language', 'watchers', 'default_branch', 'full_name', 'fork', 'forks', 'stars', 'issues', 'open_issues', 'description', 'permissions'] http = None name = None description = None full_name = None owner = None homepage = None language = None url = None private = False fork = False stars = 0 watchers = 0 forks = 0 issues = 0 open_issues = 0 clone_url = None created_at = None # Datetime updated_at = None # Datetime pushed_at = None # Datetime def __init__(self, client, full_name, **kwargs): self.permissions = {} self.full_name = full_name name = full_name.split('/').pop() if 'name' in kwargs: del kwargs['name'] super(GithubRepository, self).__init__(client, name, **kwargs) def refresh(self): if not self.full_name: raise Exception('Requires that the `full_name` attribute be set') resp = self.http.get('/repos/{r}'.format(r=self.full_name), params={'per_page': 999}) resp.raise_for_status() out = resp.json() self._set_props(**out) def _set_props(self, **kwargs): if 'owner' in kwargs: owner = kwargs['owner'] or {} if 'type' not in owner: raise NotImplementedError('Unable to determine the type for repo owner') elif str(owner['type']).lower() == 'organization': self.owner = GithubOrganization.deserialize( client=self.http, obj=kwargs['owner'] or {} ) pass elif str(owner['type']).lower() == 'user': self.owner = GithubUser.deserialize( client=self.http, obj=kwargs['owner'] or {} ) else: raise NotImplementedError('Unable to determine the right model to use ' 'for deserializing type {t}'.format( t=kwargs['owner']['type']) ) if 'ssh_url' in kwargs: self.clone_url = kwargs['ssh_url'] elif 'git_url' in kwargs: self.clone_url = kwargs['git_url'] elif 'clone_url' in kwargs: self.clone_url = kwargs['clone_url'] if 'html_url' in kwargs: self.url = kwargs['html_url'] super(GithubRepository, self)._set_props(**kwargs) def __str__(self): return 'Github Repository ({o})'.format(o=str(self.full_name)) def __hash__(self): return hash(self.full_name) @cached_property def branches(self): if not self.full_name: raise Exception('Requires that the `full_name` attribute be set') resp = self.http.get('/repos/{r}/branches'.format(r=self.full_name), params={'per_page': 999}) if resp.status_code == 404: return [] resp.raise_for_status() out = resp.json() return [GithubBranch.deserialize(client=self.http, repository=self, obj=branch) for branch in out] # ======== # Metadata # ======== def can(self, perm): """ Permissions check for 'admin', 'push', or 'pull' for this repository with the logged-in user """ return bool(self.permissions.get(perm, False)) @property def can_admin(self): return self.can('admin') @property def can_push(self): return self.can('push') @property def can_pull(self): return self.can('pull') @property def is_fork(self): return self.fork @property def is_private(self): return self.private @property def is_public(self): return not self.private class GithubBranch(BaseGithubSerializer): ALLOWED_MAPS = [] http = None name = None repository = None protection = None # GithubBranchProtection def __init__(self, client, name, repository=None, repository_name=None, **kwargs): if repository: self.repository = repository else: self.repository = GithubRepository(client, repository_name) if 'name' in kwargs: del kwargs['name'] super(GithubBranch, self).__init__(client, name, **kwargs) def refresh(self): if not isinstance(self.repository, GithubRepository): raise Exception('Requires that the `repository` attribute be set') if not self.repository.full_name: self.repository.reload() resp = self.http.get('/repos/{r}/branches/{b}'.format(r=self.repository.full_name, b=self.name), params={'per_page': 999}) resp.raise_for_status() out = resp.json() self._set_props(**out) def _set_props(self, **kwargs): if 'protection' in kwargs: self.protection = GithubBranchProtection(self.http, self, **kwargs['protection']) elif not self.protection: self.protection = GithubBranchProtection(self.http, self) super(GithubBranch, self)._set_props(**kwargs) @classmethod def fetch(cls, client, name, repository=None, repository_name=None): if not repository: repository = GithubRepository.fetch(client, repository_name) out = cls(client, name, repository=repository) out.refresh() return out @classmethod def deserialize(cls, client, repository, obj): name = obj.get('name') if 'name' in obj: del obj['name'] return cls(client=client, name=name, repository=repository, **obj) def __hash__(self): if not self.repository.full_name: self.repository.reload() return hash((self.repository.full_name, self.name)) class GithubBranchProtection(BaseGithubSerializer): http = None branch = None enabled = None required_status_checks = False required_code_review = False up_to_date = False dismiss_stale_reviews = False except_admins = True contexts = None # [] dismissal_users = None # [] dismissal_teams = None # [] push_restrictions = False push_users = None # [] push_teams = None # [] def __init__(self, client, branch, **kwargs): self.http = client self.branch = branch self.contexts = [] self.dismissal_users = [] self.dismissal_teams = [] self.push_users = [] self.push_teams = [] self._set_props(**kwargs) # This is due to the preview API not containing all the info we need, so we need to # hit multiple endpoints and stitch them together debug('GRABBING MORE') self.__more() def _set_props(self, **kwargs): # APIs for branch protection are still in preview-mode only as of GHE v2.10, so we'll have # to do a lot of manual mapping and changing of names to remain consistent between the 3 # ways to view it: `/branches`, `/branches/{name}`, and `/branches/{name}/protection` super(GithubBranchProtection, self)._set_props(**kwargs) # Yes, this is ugly. I'm sorry. if 'enabled' in kwargs: debug('`enabled` => {}'.format(repr(kwargs['enabled']))) self.enabled = kwargs['enabled'] else: debug('`enabled` not found!') if 'required_status_checks' in kwargs: required_status_checks = kwargs['required_status_checks'] or {} if kwargs['required_status_checks'] is None: debug('`required_status_checks` => IS NULL') self.required_status_checks = False elif 'enforcement_level' in required_status_checks: debug('`required_status_checks.enforcement_level` => {}'.format(repr(required_status_checks['enforcement_level']))) self.enforce_admin = required_status_checks['enforcement_level'] == 'everyone' self.required_status_checks = required_status_checks['enforcement_level'] != 'off' else: debug('`required_status_checks.enforcement_level` not found!') self.required_status_checks = True if 'strict' in required_status_checks: debug('`required_status_checks.strict` => {}'.format(repr(required_status_checks['strict']))) self.up_to_date = required_status_checks['strict'] if self.up_to_date: self.required_status_checks = True else: debug('`required_status_checks.strict` not found!') if 'contexts' in required_status_checks: debug('`required_status_checks.contexts` => {}'.format(repr(required_status_checks['contexts']))) self.contexts = required_status_checks['contexts'] else: debug('`required_status_checks.contexts` not found!') else: debug('`required_status_checks` not found!') if 'restrictions' in kwargs: restrictions = kwargs['restrictions'] or {} self.push_restrictions = kwargs['restrictions'] is not None if 'users' in restrictions: debug('`restrictions.users` => {}'.format(repr(restrictions['users']))) self.push_users = restrictions['users'] else: debug('`restrictions.users` not found!') if 'teams' in restrictions: debug('`restrictions.teams` => {}'.format(repr(restrictions['teams']))) self.push_teams = restrictions['teams'] else: debug('`restrictions.teams` not found!') else: debug('`restrictions` not found!') if 'required_pull_request_reviews' in kwargs: required_pr_reviews = kwargs['required_pull_request_reviews'] or {} self.required_code_review = kwargs['required_pull_request_reviews'] is not None if 'dismiss_stale_reviews' in required_pr_reviews: debug('`required_pull_request_reviews.dismiss_stale_reviews` => {}'.format(repr(required_pr_reviews['dismiss_stale_reviews']))) self.dismiss_stale_reviews = required_pr_reviews['dismiss_stale_reviews'] else: debug('`required_pull_request_reviews.dismiss_stale_reviews` not found!') if 'dismissal_restrictions' in required_pr_reviews: dismiss_restrict = required_pr_reviews['dismissal_restrictions'] or {} if 'users' in dismiss_restrict: debug('`required_pull_request_reviews.dismissal_restrictions.users` => {}'.format(repr(dismiss_restrict['users']))) self.dismissal_users = dismiss_restrict['users'] else: debug('`required_pull_request_reviews.dismissal_restrictions.users` not found!') if 'teams' in dismiss_restrict: debug('`required_pull_request_reviews.dismissal_restrictions.teams` => {}'.format(repr(dismiss_restrict['teams']))) self.dismissal_teams = dismiss_restrict['teams'] else: debug('`required_pull_request_reviews.dismissal_restrictions.teams` not found!') else: debug('`required_pull_request_reviews.dismissal_restrictions` not found!') else: debug('`required_pull_request_reviews` not found!') if 'enforce_admins' in kwargs: enforce_admins = kwargs['enforce_admins'] or {} if 'enabled' in enforce_admins: debug('`enforce_admins.enabled` => {}'.format(repr(enforce_admins['enabled']))) self.except_admins = not enforce_admins['enabled'] else: debug('`enforce_admins.enabled` not found!') else: debug('`enforce_admins` not found!') if self.enabled is None: # Last resort debug('LAST RESORT REACHED. Assuming branch protection is disabled') self.enabled = False def refresh(self): if not isinstance(self.branch, GithubBranch): raise Exception('Requires that the `branch` attribute be set') if not isinstance(self.branch.repository, GithubRepository): raise Exception('Requires that the `repository` attribute be set on the instance of GithubBranch') if not self.branch.repository.full_name: self.branch.repository.reload() url = '/repos/{r}/branches/{b}'.format(r=self.branch.repository.full_name, b=self.branch.name) resp = self.http.get(url, params={'per_page': 999}) if resp.status_code == 404: return resp.raise_for_status() out = resp.json() self._set_props(**out) self.__more() def __more(self): if not isinstance(self.branch, GithubBranch): raise Exception('Requires that the `branch` attribute be set') if not isinstance(self.branch.repository, GithubRepository): raise Exception('Requires that the `repository` attribute be set on the instance of GithubBranch') if not self.branch.repository.full_name: self.branch.repository.reload() url = '/repos/{r}/branches/{b}/protection'.format(r=self.branch.repository.full_name, b=self.branch.name) resp = self.http.get(url, params={'per_page': 999}) if resp.status_code == 404: return resp.raise_for_status() out = resp.json() self._set_props(**out) def __hash__(self): if not self.branch.repository.full_name: self.branch.repository.reload() return hash((self.branch.repository.full_name, self.branch.name, 'protection')) PK!H|_h.github_macros-2.0.0.dist-info/entry_points.txtmA } ":L:2ܿbJ?ދ@;Imx/g\+[v\dlUXp^iFBc8ˣLtPK!HڽTU#github_macros-2.0.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hf?s&github_macros-2.0.0.dist-info/METADATA]O0+|ɤ:Y`^rH>w ]<9od%B+0d i@Z&F}~VDc[ }C5_F`=m\s>3-lCN\M5`҂aK*FWЀF zU7C^9*߭HiFQo2IAKr[6"=gtziʡ]̐k 6JCJ-!bR9(x[\i:\:rGsX;i5i7GMO}hoxocVMe7AW:'yG+PVuĠ4;8?y7+-h > 8|L&1|@qܛ 7PK!H?$github_macros-2.0.0.dist-info/RECORDIJ-wL2 ͆0J)PH ׉cҋ;''U0Tc7IC;A 8 U l(ϥTz[7La키f9=.yY ޙnે!!.竝(+UP |B LT5Gy R[YѕaB^|dވiYw=l}'s<6iMK |T]|*l#)}Q^r AA/ 2:lA9.{WI#-]|я{VP{\ 3i N ?ߩq)I8 _8S]w$C8SW)e0axA?ʢ}0?PK!2 |<<github_macros/__init__.pyPK!Uv&&sgithub_macros/cli/__init__.pyPK!\X:]]]github_macros/cli/_base.pyPK!0(/(/&igithub_macros/cli/branch_protection.pyPK!>-2github_macros/cli/refresh.pyPK!1KP P Egithub_macros/cli/releases.pyPK!n@ %-Rgithub_macros/cli/repo_permissions.pyPK!u  4]github_macros/common.pyPK!~,9ggscgithub_macros/http.pyPK!Uv&& hgithub_macros/models/__init__.pyPK!dPcPPqhgithub_macros/models/github.pyPK!H|_h.Kgithub_macros-2.0.0.dist-info/entry_points.txtPK!HڽTU#github_macros-2.0.0.dist-info/WHEELPK!Hf?s&github_macros-2.0.0.dist-info/METADATAPK!H?$mgithub_macros-2.0.0.dist-info/RECORDPKa