PK2ZL-vzzpoetry/__init__.pyfrom poetry.console import Application from .poetry import Poetry __version__ = Poetry.VERSION console = Application() PK}hL!poetry/__version__.py__version__ = '0.4.0' PKeVLj %%poetry/console/__init__.pyfrom .application import Application PKUgL&poetry/console/application.pyimport os from cleo import Application as BaseApplication from poetry.poetry import Poetry from poetry.utils.venv import Venv from .commands import AboutCommand from .commands import AddCommand from .commands import BuildCommand from .commands import ConfigCommand from .commands import InstallCommand from .commands import LockCommand from .commands import NewCommand from .commands import PublishCommand from .commands import RemoveCommand from .commands import ShowCommand from .commands import UpdateCommand class Application(BaseApplication): def __init__(self): super().__init__('Poetry', Poetry.VERSION) self._poetry = None self._venv = Venv.create() @property def poetry(self) -> Poetry: if self._poetry is not None: return self._poetry self._poetry = Poetry.create(os.getcwd()) return self._poetry def reset_poetry(self) -> None: self._poetry = None @property def venv(self) -> Venv: return self._venv def get_default_commands(self) -> list: commands = super(Application, self).get_default_commands() return commands + [ AboutCommand(), AddCommand(), BuildCommand(), ConfigCommand(), InstallCommand(), LockCommand(), NewCommand(), PublishCommand(), RemoveCommand(), ShowCommand(), UpdateCommand(), ] def do_run(self, i, o) -> int: if self._venv.is_venv() and o.is_verbose(): o.writeln(f'Using virtualenv: {self._venv.venv}') return super().do_run(i, o) PKMgL(Lbb#poetry/console/commands/__init__.pyfrom .about import AboutCommand from .add import AddCommand from .build import BuildCommand from .config import ConfigCommand from .install import InstallCommand from .lock import LockCommand from .new import NewCommand from .publish import PublishCommand from .remove import RemoveCommand from .show import ShowCommand from .update import UpdateCommand PKZL0  poetry/console/commands/about.pyfrom .command import Command class AboutCommand(Command): """ Short information about Poetry. about """ def handle(self): self.line("""Poetry - Package Management for Python Poetry is a dependency manager tracking local dependencies of your projects and libraries. See https://github.com/sdispater/poetry for more information. """) PKdL poetry/console/commands/add.pyimport re from typing import List from typing import Tuple from poetry.installation import Installer from poetry.semver.version_parser import VersionParser from poetry.version.version_selector import VersionSelector from .command import Command class AddCommand(Command): """ Add a new depdency to poetry.toml. add { name* : Packages to add. } {--D|dev : Add package as development dependency. } {--optional : Add as an optional dependency. } {--dry-run : Outputs the operations but will not execute anything (implicitly enables --verbose). } """ help = """The add command adds required packages to your poetry.toml and installs them. If you do not specify a version constraint, poetry will choose a suitable one based on the available package versions. """ def handle(self): packages = self.argument('name') is_dev = self.option('dev') section = 'dependencies' if is_dev: section = 'dev-dependencies' original_content = self.poetry.file.read() content = self.poetry.file.read() poetry_content = content['tool']['poetry'] for name in packages: for key in poetry_content[section]: if key.lower() == name.lower(): raise ValueError(f'Package {name} is already present') requirements = self._determine_requirements(packages) requirements = self._format_requirements(requirements) # validate requirements format parser = VersionParser() for constraint in requirements.values(): parser.parse_constraints(constraint) for name, constraint in requirements.items(): poetry_content[section][name] = constraint # Write new content self.poetry.file.write(content) # Cosmetic new line self.line('') # Update packages self.reset_poetry() installer = Installer( self.output, self.poetry.package, self.poetry.locker, self.poetry.pool ) installer.dry_run(self.option('dry-run')) installer.update(True) installer.whitelist(requirements) try: status = installer.run() except Exception: self.poetry.file.write(original_content) raise if status != 0 or self.option('dry-run'): # Revert changes if not self.option('dry-run'): self.error( '\n' 'Addition failed, reverting poetry.toml ' 'to its original content.' ) self.poetry.file.write(original_content) return status def _determine_requirements(self, requires: List[str]) -> List[str]: if not requires: return [] requires = self._parse_name_version_pairs(requires) result = [] for requirement in requires: if 'version' not in requirement: # determine the best version automatically name, version = self._find_best_version_for_package( requirement['name'] ) requirement['version'] = version requirement['name'] = name self.line( f'Using version {version} for {name}' ) else: # check that the specified version/constraint exists # before we proceed name, _ = self._find_best_version_for_package( requirement['name'], requirement['version'] ) requirement['name'] = name result.append(f'{requirement["name"]} {requirement["version"]}') return result def _find_best_version_for_package(self, name, required_version=None ) -> Tuple[str, str]: selector = VersionSelector(self.poetry.pool) package = selector.find_best_candidate(name, required_version) if not package: # TODO: find similar raise ValueError( f'Could not find a matching version of package {name}' ) return ( package.pretty_name, selector.find_recommended_require_version(package) ) def _parse_name_version_pairs(self, pairs: list) -> list: result = [] for i in range(len(pairs)): pair = re.sub('^([^=: ]+)[=: ](.*)$', '\\1 \\2', pairs[i].strip()) pair = pair.strip() if ' ' in pair: name, version = pair.split(' ', 2) result.append({ 'name': name, 'version': version }) else: result.append({ 'name': pair }) return result def _format_requirements(self, requirements: List[str]) -> dict: requires = {} requirements = self._parse_name_version_pairs(requirements) for requirement in requirements: requires[requirement['name']] = requirement['version'] return requires PK hL)0ZZ poetry/console/commands/build.pyfrom .command import Command from poetry.masonry import Builder class BuildCommand(Command): """ Builds a package, as a tarball and a wheel by default. build { --f|format=* : Limit the format to either wheel or sdist} """ def handle(self): fmt = 'all' if self.option('format'): fmt = self.option('format') package = self.poetry.package self.line(f'Building {package.pretty_name} ' f'({package.version})') builder = Builder(self.poetry, self.output) builder.build(fmt) PK$gLW"poetry/console/commands/command.pyfrom cleo import Command as BaseCommand from cleo.inputs import ListInput from poetry.poetry import Poetry from ..styles.poetry import PoetryStyle class Command(BaseCommand): @property def poetry(self) -> Poetry: return self.get_application().poetry def reset_poetry(self) -> None: self.get_application().reset_poetry() def call(self, name, options=None): """ Call another command. Fixing style being passed rather than an output """ if options is None: options = [] command = self.get_application().find(name) options = [('command', command.get_name())] + options return command.run(ListInput(options), self.output.output) def run(self, i, o) -> int: """ Initialize command. """ self.input = i self.output = PoetryStyle(i, o, self.get_application().venv) return super(BaseCommand, self).run(i, o) PK|hLw!poetry/console/commands/config.pyimport json import re from pathlib import Path from poetry.locations import CONFIG_DIR from poetry.toml import loads from .command import Command TEMPLATE = """[repositories] """ AUTH_TEMPLATE = """[http-basic] """ class ConfigCommand(Command): """ Sets/Gets config options. config { key : Setting key. } { value?* : Setting value. } { --list : List configuration settings } { --unset : Unset configuration setting } """ help = """This command allows you to edit the poetry config settings and repositories.. To add a repository: poetry repositories.foo https://bar.com/simple/ To remove a repository (repo is a short alias for repositories): poetry --unset repo.foo """ def __init__(self): super().__init__() self._config_file = None self._config = {} self._auth_config_file = None self._auth_config = {} def initialize(self, i, o): super().initialize(i, o) # Create config file if it does not exist self._config_file = Path(CONFIG_DIR) / 'config.toml' self._auth_config_file = Path(CONFIG_DIR) / 'auth.toml' if not self._config_file.exists(): self._config_file.parent.mkdir(parents=True, exist_ok=True) self._config_file.write_text(TEMPLATE) if not self._auth_config_file.exists(): self._auth_config_file.parent.mkdir(parents=True, exist_ok=True) self._auth_config_file.write_text(AUTH_TEMPLATE) with self._config_file.open() as f: self._config = loads(f.read()) with self._auth_config_file.open() as f: self._auth_config = loads(f.read()) def handle(self): if self.option('list'): self._list_configuration(self._config) return 0 setting_key = self.argument('key') if not setting_key: return 0 if self.argument('value') and self.option('unset'): raise RuntimeError('You can not combine a setting value with --unset') # show the value if no value is provided if not self.argument('value') and not self.option('unset'): m = re.match('^repos?(?:itories)?(?:\.(.+))?', self.argument('key')) if m: if not m.group(1): value = {} if 'repositories' in self._config: value = self._config['repositories'] else: if m.group(1) not in self._config['repositories']: raise ValueError( f'There is not {m.group(1)} repository defined' ) value = self._config['repositories'][m.group(1)] self.line(str(value)) return 0 values = self.argument('value') # handle repositories m = re.match('^repos?(?:itories)?(?:\.(.+))?', self.argument('key')) if m: if not m.group(1): raise ValueError('You cannot remove the [repositories] section') if self.option('unset'): if m.group(1) not in self._config['repositories']: raise ValueError(f'There is not {m.group(1)} repository defined') del self._config[m.group(1)] self._config_file.write_text(self._config.dumps()) return 0 if len(values) == 1: url = values[0] if m.group(1) in self._config['repositories']: self._config['repositories'][m.group(1)]['url'] = url else: self._config['repositories'][m.group(1)] = { 'url': url } self._config_file.write_text(self._config.dumps()) return 0 raise ValueError( 'You must pass the url. ' 'Example: poetry config repositories.foo https://bar.com' ) # handle auth m = re.match('^(http-basic)\.(.+)', self.argument('key')) if m: if self.option('unset'): if m.group(2) not in self._auth_config[m.group(1)]: raise ValueError( f'There is no {m.group(2)} {m.group(1)} defined' ) del self._auth_config[m.group(1)][m.group(2)] self._auth_config_file.write_text(self._auth_config.dumps()) return 0 if m.group(1) == 'http-basic': if len(values) == 1: username = values[0] # Only username, so we prompt for password password = self.secret('Password:') elif len(values) != 2: raise ValueError(f'Expected one or two arguments ' f'(username, password), got {len(values)}') else: username = values[0] password = values[1] self._auth_config[m.group(1)][m.group(2)] = { 'username': username, 'password': password } self._auth_config_file.write_text(self._auth_config.dumps()) return 0 raise ValueError(f'Setting {self.argument("key")} does not exist') def _list_configuration(self, contents, k=None): orig_k = k for key, value in contents.items(): if k is None and key not in ['config', 'repositories']: continue if isinstance(value, dict) or key == 'repositories' and k is None: if k is None: k = '' k += re.sub('^config\.', '', key + '.') self._list_configuration(value, k=k) k = orig_k continue if isinstance(value, list): value = [ json.dumps(val) if isinstance(val, list) else val for val in value ] value = f'[{", ".join(value)}]' value = json.dumps(value) self.line(f'[{(k or "") + key}] {value}') PKۀfLΎf"poetry/console/commands/install.pyfrom poetry.installation import Installer from poetry.repositories.pypi_repository import PyPiRepository from .command import Command class InstallCommand(Command): """ Installs the project dependencies. install { --no-dev : Do not install dev dependencies. } { --dry-run : Outputs the operations but will not execute anything (implicitly enables --verbose). } { --E|extras=* : Extra sets of dependencies to install (multiple values allowed). } """ help = """The install command reads the poetry.lock file from the current directory, processes it, and downloads and installs all the libraries and dependencies outlined in that file. If the file does not exist it will look for poetry.toml and do the same. poetry install """ def handle(self): installer = Installer( self.output, self.poetry.package, self.poetry.locker, self.poetry.pool ) installer.extras(self.option('extras')) installer.dev_mode(not self.option('no-dev')) installer.dry_run(self.option('dry-run')) return installer.run() PKJ\L玣opoetry/console/commands/lock.pyfrom poetry.installation import Installer from poetry.repositories.pypi_repository import PyPiRepository from .command import Command class LockCommand(Command): """ Locks the project dependencies. lock { --no-dev : Do not install dev dependencies. } """ help = """The lock command reads the poetry.toml file from the current directory, processes it, and locks the depdencies in the poetry.lock file. poetry lock """ def handle(self): installer = Installer( self.output, self.poetry.package, self.poetry.locker, self.poetry.pool ) installer.update(True) installer.execute_operations(False) return installer.run() PK[LRRpoetry/console/commands/new.pyfrom pathlib import Path from poetry.layouts import layout from .command import Command class NewCommand(Command): """ Creates a new Python project at new { path : The path to create the project at. } { --name : Set the resulting package name. } """ def handle(self): layout_ = layout('standard') path = Path.cwd() / Path(self.argument('path')) name = self.option('name') if not name: name = path.name if path.exists(): if list(path.glob('*')): # Directory is not empty. Aborting. raise RuntimeError( 'Destination {}' 'exists and is not empty'.format( path ) ) readme_format = 'rst' layout_ = layout_(name, '0.1.0', readme_format=readme_format) layout_.create(path) self.line( 'Created package {} in {}' .format(name, path.relative_to(Path.cwd())) ) PKuhLp*U"poetry/console/commands/publish.pyfrom poetry.masonry.publishing.publisher import Publisher from .command import Command class PublishCommand(Command): """ Publishes a package to a remote repository. publish { --r|repository= : The repository to publish the package to. } { --no-build : Do not build the package before publishing. } """ help = """The publish command builds and uploads the package to a remote repository. By default, it will upload to PyPI but if you pass the --repository option it will upload to it instead. The --repository option should match the name of a configured repository using the config command. """ def handle(self): # Building package first, unless told otherwise if not self.option('no-build'): self.call('build') self.line('') publisher = Publisher(self.poetry, self.output) publisher.publish(self.option('repository')) PK̦dLLI I !poetry/console/commands/remove.pyfrom poetry.installation import Installer from .command import Command class RemoveCommand(Command): """ Removes a package from the project dependencies. remove { packages* : Packages that should be removed. } {--D|dev : Removes a package from the development dependencies. } {--dry-run : Outputs the operations but will not execute anything (implicitly enables --verbose). } """ help = """The remove command removes a package from the current list of installed packages poetry remove""" def handle(self): packages = self.argument('packages') is_dev = self.option('dev') original_content = self.poetry.file.read() content = self.poetry.file.read() poetry_content = content['tool']['poetry'] section = 'dependencies' if is_dev: section = 'dev-dependencies' # Deleting entries requirements = {} for name in packages: found = False for key in poetry_content[section]: if key.lower() == name.lower(): found = True requirements[name] = poetry_content[section][name] break if not found: raise ValueError(f'Package {name} not found') for key in requirements: del poetry_content[section][key] # Write the new content back self.poetry.file.write(content) # Update packages self.reset_poetry() installer = Installer( self.output, self.poetry.package, self.poetry.locker, self.poetry.pool ) installer.dry_run(self.option('dry-run')) installer.update(True) installer.whitelist(requirements) try: status = installer.run() except Exception: self.poetry.file.write(original_content) raise if status != 0 or self.option('dry-run'): # Revert changes if not self.option('dry-run'): self.error( '\n' 'Removal failed, reverting poetry.toml ' 'to its original content.' ) self.poetry.file.write(original_content) return status PKTgLaL]!]!poetry/console/commands/show.pyfrom poetry.semver import statisfies from poetry.version.version_selector import VersionSelector from .command import Command class ShowCommand(Command): """ Shows information about packages. show { package? : Package to inspect. } { --t|tree : List the dependencies as a tree. } { --l|latest : Show the latest version. } { --o|outdated : Show the latest version but only for packages that are outdated. } """ help = """The show command displays detailed information about a package, or lists all packages available.""" colors = [ 'green', 'yellow', 'cyan', 'magenta', 'blue', ] def handle(self): package = self.argument('package') if self.option('tree'): self.init_styles() if self.option('outdated'): self.input.set_option('latest', True) installed_repo = self.poetry.locker.locked_repository(True) # Show tree view if requested if self.option('tree') and not package: requires = self.poetry.package.requires + self.poetry.package.dev_requires packages = installed_repo.packages for package in packages: for require in requires: if package.name == require.name: self.display_package_tree(package, installed_repo) break return 0 table = self.table(style='compact') table.get_style().set_vertical_border_char('') locked_packages = installed_repo.packages if package: pkg = None for locked in locked_packages: if package.lower() == locked.name: pkg = locked break if not pkg: raise ValueError(f'Package {package} not found') if self.option('tree'): self.display_package_tree(pkg, installed_repo) return 0 rows = [ ['name', f' : {pkg.pretty_name}'], ['version', f' : {pkg.pretty_version}'], ['description', f' : {pkg.description}'], ] table.add_rows(rows) table.render() if pkg.requires: self.line('') self.line('dependencies') for dependency in pkg.requires: self.line(f' - {dependency.pretty_name} ' f'{dependency.pretty_constraint}') return 0 show_latest = self.option('latest') terminal = self.get_application().terminal width = terminal.width name_length = version_length = latest_length = 0 latest_packages = {} # Computing widths for locked in locked_packages: name_length = max(name_length, len(locked.pretty_name)) version_length = max(version_length, len(locked.full_pretty_version)) if show_latest: latest = self.find_latest_package(locked) if not latest: latest = locked latest_packages[locked.pretty_name] = latest latest_length = max(latest_length, len(latest.full_pretty_version)) write_version = name_length + version_length + 3 <= width write_latest = name_length + version_length + latest_length + 3 <= width write_description = name_length + version_length + latest_length + 24 <= width for locked in locked_packages: line = f'{locked.pretty_name:{name_length}}' if write_version: line += f' {locked.full_pretty_version:{version_length}}' if show_latest and write_latest: latest = latest_packages[locked.pretty_name] update_status = self.get_update_status(latest, locked) color = 'green' if update_status == 'semver-safe-update': color = 'red' elif update_status == 'update-possible': color = 'yellow' line += f' {latest.version:{latest_length}}' if self.option('outdated') and update_status == 'up-to-date': continue if write_description: description = locked.description remaining = width - name_length - version_length - 4 if show_latest: remaining -= latest_length if len(locked.description) > remaining: description = description[:remaining-3] + '...' line += ' ' + description self.line(line) def display_package_tree(self, package, installed_repo): self.write(f'{package.pretty_name}') self.line(f' {package.pretty_version} {package.description}') dependencies = package.requires dependencies = sorted(dependencies, key=lambda x: x.name) tree_bar = '├' j = 0 total = len(dependencies) for dependency in dependencies: j += 1 if j == total: tree_bar = '└' level = 1 color = self.colors[level] info = f'{tree_bar}── <{color}>{dependency.name} ' \ f'{dependency.pretty_constraint}' self._write_tree_line(info) tree_bar = tree_bar.replace('└', ' ') packages_in_tree = [package.name, dependency.name] self._display_tree( dependency, installed_repo, packages_in_tree, tree_bar, level + 1 ) def _display_tree(self, dependency, installed_repo, packages_in_tree, previous_tree_bar='├', level=1): previous_tree_bar = previous_tree_bar.replace('├', '│') dependencies = [] for package in installed_repo.packages: if package.name == dependency.name: dependencies = package.requires break dependencies = sorted(dependencies, key=lambda x: x.name) tree_bar = previous_tree_bar + ' ├' i = 0 total = len(dependencies) for dependency in dependencies: i += 1 current_tree = packages_in_tree if i == total: tree_bar = previous_tree_bar + ' └' color_ident = level % len(self.colors) color = self.colors[color_ident] circular_warn = '' if dependency.name in current_tree: circular_warn = '(circular dependency aborted here)' info = f'{tree_bar}── <{color}>{dependency.name} ' \ f'{dependency.pretty_constraint} {circular_warn}' self._write_tree_line(info) tree_bar = tree_bar.replace('└', ' ') if dependency.name not in current_tree: current_tree.append(dependency.name) self._display_tree( dependency, installed_repo, current_tree, tree_bar, level + 1 ) def _write_tree_line(self, line): if not self.output.is_decorated(): line = line.replace('└', '`-') line = line.replace('├', '|-') line = line.replace('──', '-') line = line.replace('│', '|') self.line(line) def init_styles(self): for color in self.colors: self.set_style(color, color) def find_latest_package(self, package): # find the latest version allowed in this pool if package.source_type == 'git': return name = package.name selector = VersionSelector(self.poetry.pool) return selector.find_best_candidate(name, f'>={package.version}') def get_update_status(self, latest, package): if latest.full_pretty_version == package.full_pretty_version: return 'up-to-date' constraint = package.version if latest.version and statisfies(latest.version, constraint): # It needs an immediate semver-compliant upgrade return 'semver-safe-update' # it needs an upgrade but has potential BC breaks so is not urgent return 'update-possible' PKR\L!_!!poetry/console/commands/update.pyfrom poetry.installation import Installer from poetry.repositories.pypi_repository import PyPiRepository from .command import Command class UpdateCommand(Command): """ Update dependencies as according to the poetry.toml file. update { packages?* : The packages to update. } { --no-dev : Do not install dev dependencies. } { --dry-run : Outputs the operations but will not execute anything (implicitly enables --verbose). } """ def handle(self): packages = self.argument('packages') installer = Installer( self.output, self.poetry.package, self.poetry.locker, self.poetry.pool ) if packages: installer.whitelist({name: '*' for name in packages}) installer.dev_mode(not self.option('no-dev')) installer.dry_run(self.option('dry-run')) # Force update installer.update(True) return installer.run() PKȺZL!poetry/console/styles/__init__.pyPK:gLl\poetry/console/styles/poetry.pyfrom cleo.styles import CleoStyle from cleo.styles import OutputStyle class PoetryStyle(CleoStyle): def __init__(self, i, o, venv): self._venv = venv super().__init__(i, o) self.output.get_formatter().add_style('warning', 'black', 'yellow') self.output.get_formatter().add_style('question', 'blue') @property def venv(self): return self._venv def writeln(self, messages, type=OutputStyle.OUTPUT_NORMAL, verbosity=OutputStyle.VERBOSITY_NORMAL): if self.output.verbosity >= verbosity: super().writeln(messages, type=type) def write(self, messages, newline=False, type=OutputStyle.OUTPUT_NORMAL, verbosity=OutputStyle.VERBOSITY_NORMAL): if self.output.verbosity >= verbosity: super().write(messages, newline=newline, type=type) PKZL!!poetry/installation/__init__.pyfrom .installer import Installer PKb\L, %poetry/installation/base_installer.pyclass BaseInstaller: def install(self, package): raise NotImplementedError def update(self, source, target): raise NotImplementedError def remove(self, package): raise NotImplementedError PKQgL@8f:: poetry/installation/installer.pyimport sys from typing import List from poetry.packages import Dependency from poetry.packages import Locker from poetry.packages import Package from poetry.puzzle import Solver from poetry.puzzle.operations import Install from poetry.puzzle.operations import Uninstall from poetry.puzzle.operations import Update from poetry.puzzle.operations.operation import Operation from poetry.repositories import Pool from poetry.repositories import Repository from poetry.repositories.installed_repository import InstalledRepository from poetry.semver.constraints import Constraint from poetry.semver.version_parser import VersionParser from .base_installer import BaseInstaller from .pip_installer import PipInstaller class Installer: def __init__(self, io, package: Package, locker: Locker, pool: Pool): self._io = io self._package = package self._locker = locker self._pool = pool self._dry_run = False self._update = False self._verbose = False self._write_lock = True self._dev_mode = True self._execute_operations = True self._whitelist = {} self._extras = [] self._installer = self._get_installer() @property def installer(self): return self._installer def run(self): # Force update if there is no lock file present if not self._update and not self._locker.is_locked(): self._update = True if self.is_dry_run(): self.verbose(True) self._write_lock = False self._execute_operations = False local_repo = Repository() self._do_install(local_repo) return 0 def dry_run(self, dry_run=True) -> 'Installer': self._dry_run = dry_run return self def is_dry_run(self) -> bool: return self._dry_run def verbose(self, verbose=True) -> 'Installer': self._verbose = verbose return self def is_verbose(self) -> bool: return self._verbose def dev_mode(self, dev_mode=True) -> 'Installer': self._dev_mode = dev_mode return self def is_dev_mode(self) -> bool: return self._dev_mode def update(self, update=True) -> 'Installer': self._update = update return self def is_updating(self) -> bool: return self._update def execute_operations(self, execute=True) -> 'Installer': self._execute_operations = execute return self def whitelist(self, packages: dict) -> 'Installer': self._whitelist = packages return self def extras(self, extras: list) -> 'Installer': self._extras = extras return self def _do_install(self, local_repo): locked_repository = Repository() # initialize locked repo if we are installing from lock if not self._update or self._locker.is_locked(): locked_repository = self._locker.locked_repository(True) if self._update: # Checking extras for extra in self._extras: if extra not in self._package.extras: raise ValueError(f'Extra [{extra}] is not specified.') self._io.writeln('Updating dependencies') fixed = [] # If the whitelist is enabled, packages not in it are fixed # to the version specified in the lock if self._whitelist: # collect packages to fixate from root requirements candidates = [] for package in locked_repository.packages: candidates.append(package) # fix them to the version in lock if they are not updateable for candidate in candidates: to_fix = True for require in self._whitelist.keys(): if require == candidate.name: to_fix = False if to_fix: fixed.append( Dependency(candidate.name, candidate.version) ) solver = Solver( self._package, self._pool, locked_repository, self._io ) request = self._package.requires request += self._package.dev_requires ops = solver.solve(request, fixed=fixed) else: self._io.writeln('Installing dependencies from lock file') if not self._locker.is_fresh(): self._io.writeln( '' 'Warning: The lock file is not up to date with ' 'the latest changes in pyproject.toml. ' 'You may be getting outdated dependencies. ' 'Run update to update them.' '' ) for extra in self._extras: if extra not in self._locker.lock_data.get('extras', {}): raise ValueError(f'Extra [{extra}] is not specified.') # If we are installing from lock # Filter the operations by comparing it with what is # currently installed ops = self._get_operations_from_lock(locked_repository) self._populate_local_repo(local_repo, ops, locked_repository) # We need to filter operations so that packages # not compatible with the current system, # or optional and not requested, are dropped self._filter_operations(ops, local_repo) self._io.new_line() # Execute operations if not ops and (self._execute_operations or self._dry_run): self._io.writeln('Nothing to install or update') if ops and (self._execute_operations or self._dry_run): installs = [] updates = [] uninstalls = [] for op in ops: if op.job_type == 'install': installs.append( f'{op.package.pretty_name}' f':{op.package.full_pretty_version}' ) elif op.job_type == 'update': updates.append( f'{op.target_package.pretty_name}' f':{op.target_package.full_pretty_version}' ) elif op.job_type == 'uninstall': uninstalls.append( f'{op.package.pretty_name}' ) self._io.new_line() self._io.writeln( 'Package operations: ' f'{len(installs)} install{"" if len(installs) == 1 else "s"}, ' f'{len(updates)} update{"" if len(updates) == 1 else "s"}, ' f'{len(uninstalls)} removal{"" if len(uninstalls) == 1 else "s"}' f'' ) self._io.new_line() # Writing lock before installing if self._update and self._write_lock: updated_lock = self._locker.set_lock_data( self._package, local_repo.packages ) if updated_lock: self._io.writeln('Writing lock file') self._io.writeln('') for op in ops: self._execute(op) def _execute(self, operation: Operation) -> None: """ Execute a given operation. """ method = operation.job_type getattr(self, f'_execute_{method}')(operation) def _execute_install(self, operation: Install) -> None: if operation.skipped: if self._io.is_verbose() and (self._execute_operations or self.is_dry_run()): self._io.writeln( f' - Skipping {operation.package.pretty_name} ' f'({operation.package.full_pretty_version}) ' f'{operation.skip_reason}') return if self._execute_operations or self.is_dry_run(): self._io.writeln( f' - Installing {operation.package.pretty_name} ' f'({operation.package.full_pretty_version})' ) if not self._execute_operations: return self._installer.install(operation.package) def _execute_update(self, operation: Update) -> None: source = operation.initial_package target = operation.target_package if operation.skipped: if self._io.is_verbose() and (self._execute_operations or self.is_dry_run()): self._io.writeln( f' - Skipping {target.pretty_name} ' f'({target.full_pretty_version}) ' f'{operation.skip_reason}') return if self._execute_operations or self.is_dry_run(): self._io.writeln( f' - Updating {target.pretty_name} ' f'({source.pretty_version}' f' -> {target.pretty_version})' ) if not self._execute_operations: return self._installer.update(source, target) def _execute_uninstall(self, operation: Uninstall) -> None: if self._execute_operations or self.is_dry_run(): self._io.writeln( f' - Removing {operation.package.pretty_name} ' f'({operation.package.full_pretty_version})' ) if not self._execute_operations: return self._installer.remove(operation.package) def _populate_local_repo(self, local_repo, ops, locked_repository): # Add all locked packages from the lock and go from there for package in locked_repository.packages: local_repo.add_package(package) # Now, walk through all operations and add/remove/update accordingly for op in ops: if isinstance(op, Update): package = op.target_package else: package = op.package acted_on = False for pkg in local_repo.packages: if pkg.name == package.name: # The package we operate on is in the local repo if op.job_type == 'update': if pkg.version == package.version: break local_repo.remove_package(pkg) local_repo.add_package(op.target_package) elif op.job_type == 'uninstall': local_repo.remove_package(op.package) acted_on = True if not acted_on: local_repo.add_package(package) def _get_operations_from_lock(self, locked_repository: Repository ) -> List[Operation]: installed_repo = InstalledRepository.load(self._io.venv) ops = [] extra_packages = [ p.name for p in self._get_extra_packages(locked_repository) ] for locked in locked_repository.packages: is_installed = False for installed in installed_repo.packages: if locked.name == installed.name: is_installed = True if locked.category == 'dev' and not self.is_dev_mode(): ops.append(Uninstall(locked)) elif locked.optional and locked.name not in extra_packages: # Installed but optional and not requested in extras ops.append(Uninstall(locked)) elif locked.version != installed.version: ops.append(Update( installed, locked )) if not is_installed: # If it's optional and not in required extras # we do not install if locked.optional and locked.name not in extra_packages: continue ops.append(Install(locked)) return ops def _filter_operations(self, ops: List[Operation], repo: Repository) -> None: extra_packages = [p.name for p in self._get_extra_packages(repo)] for op in ops: if isinstance(op, Update): package = op.target_package else: package = op.package if op.job_type == 'uninstall': continue parser = VersionParser() python = '.'.join([str(i) for i in sys.version_info[:3]]) if 'python' in package.requirements: python_constraint = parser.parse_constraints( package.requirements['python'] ) if not python_constraint.matches(Constraint('=', python)): # Incompatible python versions op.skip('Not needed for the current python version') continue if self._update: extras = {} for extra, deps in self._package.extras.items(): extras[extra] = [dep.name for dep in deps] else: extras = {} for extra, deps in self._locker.lock_data.get('extras', {}).items(): extras[extra] = [dep.lower() for dep in deps] # If a package is optional and not requested # in any extra we skip it if package.optional: if package.name not in extra_packages: op.skip('Not required') def _get_extra_packages(self, repo): """ Returns all packages required by extras. Maybe we just let the solver handle it? """ if self._update: extras = { k: [d.name for d in v] for k, v in self._package.extras.items() } else: extras = self._locker.lock_data.get('extras', {}) extra_packages = [] for extra_name, packages in extras.items(): if extra_name not in self._extras: continue extra_packages += [Dependency(p, '*') for p in packages] def _extra_packages(packages): pkgs = [] for package in packages: for pkg in repo.packages: if pkg.name == package.name: pkgs.append(package) pkgs += _extra_packages(pkg.requires) break return pkgs return _extra_packages(extra_packages) def _get_installer(self) -> BaseInstaller: return PipInstaller(self._io.venv, self._io) PK_fL"%poetry/installation/noop_installer.pyfrom .base_installer import BaseInstaller class NoopInstaller(BaseInstaller): def __init__(self): self._installs = [] self._updates = [] self._removals = [] @property def installs(self): return self._installs @property def updates(self): return self._updates @property def removals(self): return self._removals def install(self, package) -> None: self._installs.append(package) def update(self, source, target) -> None: self._updates.append((source, target)) def remove(self, package) -> None: self._removals.append(package) PKaiaL$poetry/installation/pip_installer.pyfrom poetry.utils.venv import Venv from .base_installer import BaseInstaller class PipInstaller(BaseInstaller): def __init__(self, venv: Venv, io): self._venv = venv self._io = io def install(self, package): args = ['install', self.requirement(package), '--no-deps'] if package.source_type == 'legacy' and package.source_url: args += ['--index-url', package.source_url] self.run(*args) def update(self, source, target): args = ['install', self.requirement(target), '--no-deps', '-U'] if target.source_type == 'legacy' and target.source_url: args += ['--index-url', target.source_url] self.run('install', self.requirement(target), '--no-deps', '-U') def remove(self, package): self.run('uninstall', package.name, '-y') def run(self, *args) -> str: return self._venv.run('pip', *args) def requirement(self, package) -> str: if package.source_type == 'git': return f'git+{package.source_url}@{package.source_reference}' \ f'#egg={package.name}' return f'{package.name}=={package.version}' PK\LYpoetry/io/__init__.pyfrom .null_io import NullIO PKOZgL.ddpoetry/io/null_io.pyfrom poetry.console.styles.poetry import PoetryStyle from poetry.utils.venv import Venv class NullVenv(Venv): def __init__(self): super().__init__() self.executed = [] def run(self, bin: str, *args): self.executed.append([bin] + list(args)) def _bin(self, bin): return bin class NullIO(PoetryStyle): def __init__(self): self._venv = NullVenv() @property def venv(self) -> NullVenv: return self._venv def is_quiet(self) -> bool: return False def is_verbose(self) -> bool: return False def is_very_verbose(self) -> bool: return False def is_debug(self) -> bool: return False def writeln(self, *args, **kwargs): pass def write(self, *args, **kwargs): pass def new_line(self, *args, **kwargs): pass PKL[L7Dwpoetry/layouts/__init__.pyfrom typing import Type from .layout import Layout from .standard import StandardLayout _LAYOUTS = { 'standard': StandardLayout } def layout(name: str) -> Type[Layout]: if name not in _LAYOUTS: raise ValueError('Invalid layout') return _LAYOUTS[name] PK2QfL g poetry/layouts/layout.pyimport toml from poetry.utils.helpers import module_name from poetry.vcs.git import Git TESTS_DEFAULT = """from {package_name} import __version__ def test_version(): assert '{version}' == __version__ """ class Layout(object): def __init__(self, project, version='0.1.0', readme_format='md', author=None): self._project = project self._package_name = module_name(project) self._version = version self._readme_format = readme_format self._dependencies = {} self._dev_dependencies = {} self._include = [] self._git = Git() git_config = self._git.config if not author: if ( git_config.get('user.name') and git_config.get('user.email') ): author = '{} <{}>'.format( git_config['user.name'], git_config['user.email'] ) else: author = 'Your Name ' self._author = author def create(self, path, with_tests=True): self._dependencies = {} self._dev_dependencies = {} self._include = [] path.mkdir(parents=True, exist_ok=True) self._create_default(path) self._create_readme(path) if with_tests: self._create_tests(path) self._write_poetry(path) def _create_default(self, path, src=True): raise NotImplementedError() def _create_readme(self, path): if self._readme_format == 'rst': readme_file = path / 'README.rst' else: readme_file = path / 'README.md' readme_file.touch() def _create_tests(self, path): self._dev_dependencies['pytest'] = '^3.0' tests = path / 'tests' tests_init = tests / '__init__.py' tests_default = tests / 'test_{}.py'.format(self._package_name) tests.mkdir() tests_init.touch(exist_ok=False) with tests_default.open('w') as f: f.write( TESTS_DEFAULT.format( package_name=self._package_name, version=self._version ) ) def _write_poetry(self, path): output = { 'tool': { 'poetry': { 'name': self._project, 'version': self._version, 'authors': [self._author], } } } content = toml.dumps(output, preserve=True) output = { 'tool': { 'poetry': { 'dependencies': {}, 'dev-dependencies': { 'pytest': '^3.4' } } } } content += '\n' + toml.dumps(output, preserve=True) poetry = path / 'pyproject.toml' with poetry.open('w') as f: f.write(content) PK[Lpoetry/layouts/standard.py# -*- coding: utf-8 -*- from .layout import Layout DEFAULT = """__version__ = '{version}' """ class StandardLayout(Layout): def _create_default(self, path): package_path = path / self._package_name package_init = package_path / '__init__.py' package_path.mkdir() with package_init.open('w') as f: f.write(DEFAULT.format(version=self._version)) PKrZVLv~ܩpoetry/locations.py# -*- coding: utf-8 -*- from pip.utils.appdirs import user_cache_dir, user_config_dir CACHE_DIR = user_cache_dir('pypoetry') CONFIG_DIR = user_config_dir('pypoetry') PK2QfLΨDpoetry/masonry/__init__.py""" This module handles the packaging and publishing of python projects. A lot of the code used here has been taken from `flit `__ and adapted to work with the poetry codebase, so kudos to them for showing the way. """ from .builder import Builder PK4gL*Upoetry/masonry/__publisher.pyimport hashlib import toml import requests from pathlib import Path from poetry.locations import CONFIG_DIR from poetry.semver.constraints import Constraint from poetry.semver.constraints import MultiConstraint from .builders.builder import Builder class Publisher: def __init__(self, poetry, io): self._poetry = poetry self._package = poetry.package self._io = io def publish(self, repository_name): if repository_name: self._io.writeln( f'Publishing {self._package.pretty_name} ' f'({self._package.pretty_version}) ' f'to {repository_name}' ) else: self._io.writeln( f'Publishing {self._package.pretty_name} ' f'({self._package.pretty_version}) ' f'to PyPI' ) if not repository_name: url = 'https://upload.pypi.org/legacy/' else: # Retrieving config information config_file = Path(CONFIG_DIR) / 'config.toml' if not config_file.exists(): raise RuntimeError( 'Config file does not exist. ' 'Unable to get repository information' ) with config_file.open() as f: config = toml.loads(f.read()) if ( 'repositories' not in config or repository_name not in config['repositories'] ): raise RuntimeError( f'Repository {repository_name} is not defined' ) url = config['repositories'][repository_name]['url'] username = None password = None auth_file = Path(CONFIG_DIR) / 'auth.toml' if not auth_file.exists(): # No auth file, we will ask for info later auth_config = {} else: with auth_file.open() as f: auth_config = toml.loads(f.read()) if 'http-basic' in auth_config and repository_name in auth_config['http-basic']: config = auth_config['http-basic'][repository_name] username = config.get('username') password = config.get('password') return self.upload(url, username=username, password=password) def upload(self, url, username=None, password=None): data = self.build_post_data('file_upload') def upload_file(self, file, url, username, password): data = self.build_post_data('file_upload') data['protocol_version'] = '1' if file.suffix == '.whl': data['filetype'] = 'bdist_wheel' py2_support = self._package.python_constraint.matches( MultiConstraint([ Constraint('>=', '2.0.0'), Constraint('<', '3.0.0') ]) ) data['pyversion'] = ('py2.' if py2_support else '') + 'py3' else: data['filetype'] = 'sdist' with file.open('rb') as f: content = f.read() files = {'content': (file.name, content)} data['md5_digest'] = hashlib.md5(content).hexdigest() log.info('Uploading %s...', file) resp = requests.post(repo['url'], data=data, files=files, auth=(repo['username'], repo['password']), ) resp.raise_for_status() def build_post_data(self, action): builder = Builder(self._poetry, self._io) d = { ":action": action, "name": self._package.name, "version": self._package.version, # additional meta-data "metadata_version": '1.2', "summary": self._package.description, "home_page": self._package.homepage or self._package.repository_url, "author": self._package.author_name, "author_email": self._package.author_email, "maintainer": self._package.author_name, "maintainer_email": self._package.author_email, "license": self._package.license, "description": self._package.readme, "keywords": ','.join(self._package.keywords), "platform": None if self._package.platform == '*' else self._package.platform, "classifiers": builder.get_classifers(), "download_url": None, "supported_platform": None if self._package.platform == '*' else self._package.platform, "project_urls": [], "provides_dist": [], "obsoletes_dist": [], "requires_dist": [d.to_pep_508() for d in self._package.requires], "requires_external": [], "requires_python": builder.convert_python_version(), } return {k: v for k, v in d.items() if v} PKLgL\poetry/masonry/api.py""" PEP-517 compliant buildsystem API """ import logging from pathlib import Path from poetry import Poetry from poetry.io import NullIO from .builders import SdistBuilder from .builders import WheelBuilder log = logging.getLogger(__name__) # PEP 517 specifies that the CWD will always be the source tree poetry = Poetry.create('.') def get_requires_for_build_wheel(config_settings=None): """ Returns a list of requirements for building, as strings """ main, extras = SdistBuilder.convert_dependencies(poetry.package.requires) return main + extras # For now, we require all dependencies to build either a wheel or an sdist. get_requires_for_build_sdist = get_requires_for_build_wheel def build_wheel(wheel_directory, config_settings=None, metadata_directory=None): """Builds a wheel, places it in wheel_directory""" info = WheelBuilder.make_in(poetry, NullIO(), Path(wheel_directory)) return info.file.name def build_sdist(sdist_directory, config_settings=None): """Builds an sdist, places it in sdist_directory""" path = SdistBuilder(poetry, NullIO()).build(Path(sdist_directory)) return path.name PKgL ddpoetry/masonry/builder.pyfrom poetry.semver.constraints import MultiConstraint from .builders import CompleteBuilder from .builders import SdistBuilder from .builders import WheelBuilder class Builder: _FORMATS = { 'sdist': SdistBuilder, 'wheel': WheelBuilder, 'all': CompleteBuilder } def __init__(self, poetry, io): self._poetry = poetry self._io = io def build(self, fmt: str): if fmt not in self._FORMATS: raise ValueError(f'Invalid format: {fmt}') self.check() builder = self._FORMATS[fmt](self._poetry, self._io) return builder.build() def check(self) -> None: package = self._poetry.package # Checking for disjunctive python versions if isinstance(package.python_constraint, MultiConstraint): if package.python_constraint.is_disjunctive(): raise RuntimeError( 'Disjunctive python versions are not yet supported ' 'when building packages. Rewrite your python requirements ' 'in a conjunctive way.' ) PK2QfLi>$ff#poetry/masonry/builders/__init__.pyfrom .complete import CompleteBuilder from .sdist import SdistBuilder from .wheel import WheelBuilder PKhL֯"poetry/masonry/builders/builder.pyimport os import re from collections import defaultdict from pathlib import Path from poetry.semver.constraints import Constraint from poetry.semver.constraints import MultiConstraint from poetry.semver.version_parser import VersionParser from poetry.vcs import get_vcs from ..utils.module import Module AUTHOR_REGEX = re.compile('(?u)^(?P[- .,\w\d\'’"()]+) <(?P.+?)>$') class Builder: AVAILABLE_PYTHONS = { '2', '2.7', '3', '3.4', '3.5', '3.6', '3.7' } def __init__(self, poetry, io): self._poetry = poetry self._io = io self._package = poetry.package self._path = poetry.file.parent self._module = Module(self._package.name, self._path.as_posix()) def build(self): raise NotImplementedError() def find_excluded_files(self) -> list: # Checking VCS vcs = get_vcs(self._path) if not vcs: return [] ignored = vcs.get_ignored_files() result = [] for file in ignored: try: file = Path(file).absolute().relative_to(self._path) except ValueError: # Should only happen in tests continue result.append(file) return result def find_files_to_add(self) -> list: """ Finds all files to add to the tarball TODO: Support explicit include/exclude """ excluded = self.find_excluded_files() src = self._module.path to_add = [] for root, dirs, files in os.walk(src.as_posix()): root = Path(root) if root.name == '__pycache__': continue for file in files: file = root / file file = file.relative_to(self._path) if file in excluded: continue if file.suffix == '.pyc': continue self._io.writeln( f' - Adding: {str(file)}', verbosity=self._io.VERBOSITY_VERY_VERBOSE ) to_add.append(file) # Include project files self._io.writeln( f' - Adding: pyproject.toml', verbosity=self._io.VERBOSITY_VERY_VERBOSE ) to_add.append(Path('pyproject.toml')) # If a README is specificed we need to include it # to avoid errors if 'readme' in self._poetry.config: readme = self._path / self._poetry.config['readme'] if readme.exists(): self._io.writeln( f' - Adding: {readme.relative_to(self._path)}', verbosity=self._io.VERBOSITY_VERY_VERBOSE ) to_add.append(readme.relative_to(self._path)) return sorted(to_add) def convert_entry_points(self) -> dict: result = defaultdict(list) # Scripts -> Entry points for name, ep in self._poetry.config.get('scripts', {}).items(): result['console_scripts'].append(f'{name} = {ep}') # Plugins -> entry points for groupname, group in self._poetry.config.get('plugins', {}).items(): for name, ep in sorted(group.items()): result[groupname].append(f'{name} = {ep}') return dict(result) @classmethod def convert_author(cls, author) -> dict: m = AUTHOR_REGEX.match(author) name = m.group('name') email = m.group('email') return { 'name': name, 'email': email } def get_classifers(self): classifiers = [] # Automatically set python classifiers parser = VersionParser() if self._package.python_versions == '*': python_constraint = parser.parse_constraints('~2.7 || ^3.4') else: python_constraint = self._package.python_constraint for version in sorted(self.AVAILABLE_PYTHONS): if python_constraint.matches(Constraint('=', version)): classifiers.append(f'Programming Language :: Python :: {version}') return classifiers def convert_python_version(self): constraint = self._package.python_constraint if isinstance(constraint, MultiConstraint): python_requires = ','.join( [str(c).replace(' ', '') for c in constraint.constraints] ) else: python_requires = str(constraint).replace(' ', '') return python_requires PKXgL6:o#poetry/masonry/builders/complete.pyimport os import tarfile import poetry from contextlib import contextmanager from tempfile import TemporaryDirectory from types import SimpleNamespace from .builder import Builder from .sdist import SdistBuilder from .wheel import WheelBuilder class CompleteBuilder(Builder): def build(self): # We start by building the tarball # We will use it to build the wheel sdist_builder = SdistBuilder(self._poetry, self._io) sdist_file = sdist_builder.build() sdist_info = SimpleNamespace(builder=sdist_builder, file=sdist_file) self._io.writeln('') dist_dir = self._path / 'dist' with self.unpacked_tarball(sdist_file) as tmpdir: wheel_info = WheelBuilder.make_in( poetry.Poetry.create(tmpdir), self._io, dist_dir ) return SimpleNamespace(wheel=wheel_info, sdist=sdist_info) @classmethod @contextmanager def unpacked_tarball(cls, path): tf = tarfile.open(str(path)) with TemporaryDirectory() as tmpdir: tf.extractall(tmpdir) files = os.listdir(tmpdir) assert len(files) == 1, files yield os.path.join(tmpdir, files[0]) PK!vhL poetry/masonry/builders/sdist.pyimport os import tarfile from collections import defaultdict from copy import copy from gzip import GzipFile from io import BytesIO from pathlib import Path from posixpath import join as pjoin from pprint import pformat from typing import List from poetry.packages import Dependency from poetry.semver.constraints import MultiConstraint from ..utils.helpers import normalize_file_permissions from .builder import Builder SETUP = """\ from distutils.core import setup {before} setup( name={name!r}, version={version!r}, description={description!r}, author={author!r}, author_email={author_email!r}, url={url!r}, {extra} ) """ PKG_INFO = """\ Metadata-Version: 1.1 Name: {name} Version: {version} Summary: {summary} Home-page: {home_page} Author: {author} Author-email: {author_email} """ class SdistBuilder(Builder): def __init__(self, poetry, io): super().__init__(poetry, io) def build(self, target_dir: Path = None) -> Path: self._io.writeln(' - Building sdist') if target_dir is None: target_dir = self._path / 'dist' if not target_dir.exists(): target_dir.mkdir(parents=True) target = target_dir / f'{self._package.pretty_name}' \ f'-{self._package.version}.tar.gz' gz = GzipFile(target.as_posix(), mode='wb') tar = tarfile.TarFile(target.as_posix(), mode='w', fileobj=gz, format=tarfile.PAX_FORMAT) try: tar_dir = f'{self._package.pretty_name}-{self._package.version}' files_to_add = self.find_files_to_add() for relpath in files_to_add: path = self._path / relpath tar_info = tar.gettarinfo( str(path), arcname=pjoin(tar_dir, relpath) ) tar_info = self.clean_tarinfo(tar_info) if tar_info.isreg(): with path.open('rb') as f: tar.addfile(tar_info, f) else: tar.addfile(tar_info) # Symlinks & ? setup = self.build_setup() tar_info = tarfile.TarInfo(pjoin(tar_dir, 'setup.py')) tar_info.size = len(setup) tar.addfile(tar_info, BytesIO(setup)) author = self.convert_author(self._package.authors[0]) pkg_info = PKG_INFO.format( name=self._package.name, version=self._package.version, summary=self._package.description, home_page=self._package.homepage or self._package.repository_url, author=author['name'], author_email=author['email'], ).encode('utf-8') tar_info = tarfile.TarInfo(pjoin(tar_dir, 'PKG-INFO')) tar_info.size = len(pkg_info) tar.addfile(tar_info, BytesIO(pkg_info)) finally: tar.close() gz.close() self._io.writeln(f' - Built {target.name}') return target def build_setup(self) -> bytes: before, extra = [], [] if self._module.is_package(): packages, package_data = self.find_packages( self._module.path.as_posix() ) before.append("packages = \\\n{}\n".format(pformat(sorted(packages)))) before.append("package_data = \\\n{}\n".format(pformat(package_data))) extra.append("packages=packages,") extra.append("package_data=package_data,") else: extra.append('py_modules={!r},'.format(self._module.name)) dependencies, extras = self.convert_dependencies(self._package.requires) if dependencies: before.append("install_requires = \\\n{}\n".format(pformat(dependencies))) extra.append("install_requires=install_requires,") if extras: before.append("extras_require = \\\n{}\n".format(pformat(extras))) extra.append("extras_require=extras_require,") entry_points = self.convert_entry_points() if entry_points: before.append("entry_points = \\\n{}\n".format(pformat(entry_points))) extra.append("entry_points=entry_points,") if self._package.python_versions != '*': constraint = self._package.python_constraint if isinstance(constraint, MultiConstraint): python_requires = ','.join( [str(c).replace(' ', '') for c in constraint.constraints] ) else: python_requires = str(constraint).replace(' ', '') extra.append('python_requires={!r},'.format(python_requires)) author = self.convert_author(self._package.authors[0]) return SETUP.format( before='\n'.join(before), name=self._package.name, version=self._package.version, description=self._package.description, author=author['name'], author_email=author['email'], url=self._package.homepage or self._package.repository_url, extra='\n '.join(extra), ).encode('utf-8') @classmethod def find_packages(cls, path: str): """ Discover subpackages and data. It also retrieve necessary files """ pkgdir = os.path.normpath(path) pkg_name = os.path.basename(pkgdir) pkg_data = defaultdict(list) # Undocumented distutils feature: # the empty string matches all package names pkg_data[''].append('*') packages = [pkg_name] subpkg_paths = set() def find_nearest_pkg(rel_path): parts = rel_path.split(os.sep) for i in reversed(range(1, len(parts))): ancestor = '/'.join(parts[:i]) if ancestor in subpkg_paths: pkg = '.'.join([pkg_name] + parts[:i]) return pkg, '/'.join(parts[i:]) # Relative to the top-level package return pkg_name, rel_path for path, dirnames, filenames in os.walk(pkgdir, topdown=True): if os.path.basename(path) == '__pycache__': continue from_top_level = os.path.relpath(path, pkgdir) if from_top_level == '.': continue is_subpkg = '__init__.py' in filenames if is_subpkg: subpkg_paths.add(from_top_level) parts = from_top_level.split(os.sep) packages.append('.'.join([pkg_name] + parts)) else: pkg, from_nearest_pkg = find_nearest_pkg(from_top_level) pkg_data[pkg].append(pjoin(from_nearest_pkg, '*')) # Sort values in pkg_data pkg_data = {k: sorted(v) for (k, v) in pkg_data.items()} return sorted(packages), pkg_data @classmethod def convert_dependencies(cls, dependencies: List[Dependency]): main = [] extras = [] for dependency in dependencies: requirement = dependency.to_pep_508() if ';' in requirement: extras.append(requirement) else: main.append(requirement) return main, extras @classmethod def clean_tarinfo(cls, tar_info): """ Clean metadata from a TarInfo object to make it more reproducible. - Set uid & gid to 0 - Set uname and gname to "" - Normalise permissions to 644 or 755 - Set mtime if not None """ ti = copy(tar_info) ti.uid = 0 ti.gid = 0 ti.uname = '' ti.gname = '' ti.mode = normalize_file_permissions(ti.mode) return ti PK hLHb## poetry/masonry/builders/wheel.pyimport contextlib import hashlib import os import re import tempfile import stat import zipfile from base64 import urlsafe_b64encode from io import StringIO from pathlib import Path from types import SimpleNamespace from poetry.__version__ import __version__ from poetry.semver.constraints import Constraint from poetry.semver.constraints import MultiConstraint from ..utils.helpers import normalize_file_permissions from .builder import Builder wheel_file_template = """\ Wheel-Version: 1.0 Generator: poetry {version} Root-Is-Purelib: true """.format(version=__version__) class WheelBuilder(Builder): def __init__(self, poetry, io, target_fp): super().__init__(poetry, io) self._records = [] # Open the zip file ready to write self._wheel_zip = zipfile.ZipFile(target_fp, 'w', compression=zipfile.ZIP_DEFLATED) @classmethod def make_in(cls, poetry, io, directory) -> SimpleNamespace: # We don't know the final filename until metadata is loaded, so write to # a temporary_file, and rename it afterwards. (fd, temp_path) = tempfile.mkstemp(suffix='.whl', dir=str(directory)) try: with open(fd, 'w+b') as fp: wb = WheelBuilder(poetry, io, fp) wb.build() wheel_path = directory / wb.wheel_filename os.replace(temp_path, str(wheel_path)) except: os.unlink(temp_path) raise return SimpleNamespace(builder=wb, file=wheel_path) @classmethod def make(cls, poetry, io) -> SimpleNamespace: """Build a wheel in the dist/ directory, and optionally upload it. """ dist_dir = poetry.file.parent / 'dist' try: dist_dir.mkdir() except FileExistsError: pass return cls.make_in(poetry, io, dist_dir) def build(self) -> None: self._io.writeln(' - Building wheel') try: self.copy_module() self.write_metadata() self.write_record() finally: self._wheel_zip.close() self._io.writeln(f' - Built {self.wheel_filename}') def copy_module(self) -> None: if self._module.is_package(): files = self.find_files_to_add() # Walk the files and compress them, # sorting everything so the order is stable. for file in sorted(files): full_path = self._path / file # Do not include topmost files if full_path.relative_to(self._path) == Path(file.name): continue self._add_file(full_path, file) else: self._add_file(str(self._module.path), self._module.path.name) def write_metadata(self): if 'scripts' in self._poetry.config or 'plugins' in self._poetry.config: with self._write_to_zip(self.dist_info + '/entry_points.txt') as f: self._write_entry_points(f) for base in ('COPYING', 'LICENSE'): for path in sorted(self._path.glob(base + '*')): self._add_file(path, '%s/%s' % (self.dist_info, path.name)) with self._write_to_zip(self.dist_info + '/WHEEL') as f: self._write_wheel_file(f) with self._write_to_zip(self.dist_info + '/METADATA') as f: self._write_metadata_file(f) def write_record(self): # Write a record of the files in the wheel with self._write_to_zip(self.dist_info + '/RECORD') as f: for path, hash, size in self._records: f.write('{},sha256={},{}\n'.format(path, hash, size)) # RECORD itself is recorded with no hash or size f.write(self.dist_info + '/RECORD,,\n') @property def dist_info(self) -> str: return self.dist_info_name(self._package.name, self._package.version) @property def wheel_filename(self) -> str: tag = ('py2.' if self.supports_python2() else '') + 'py3-none-any' return '{}-{}-{}.whl'.format( re.sub("[^\w\d.]+", "_", self._package.pretty_name, flags=re.UNICODE), re.sub("[^\w\d.]+", "_", self._package.version, flags=re.UNICODE), tag) def supports_python2(self): return self._package.python_constraint.matches( MultiConstraint([ Constraint('>=', '2.0.0'), Constraint('<', '3.0.0') ]) ) def dist_info_name(self, distribution, version) -> str: escaped_name = re.sub("[^\w\d.]+", "_", distribution, flags=re.UNICODE) escaped_version = re.sub("[^\w\d.]+", "_", version, flags=re.UNICODE) return '{}-{}.dist-info'.format(escaped_name, escaped_version) def _add_file(self, full_path, rel_path): full_path, rel_path = str(full_path), str(rel_path) if os.sep != '/': # We always want to have /-separated paths in the zip file and in # RECORD rel_path = rel_path.replace(os.sep, '/') zinfo = zipfile.ZipInfo.from_file(full_path, rel_path) # Normalize permission bits to either 755 (executable) or 644 st_mode = os.stat(full_path).st_mode new_mode = normalize_file_permissions(st_mode) zinfo.external_attr = (new_mode & 0xFFFF) << 16 # Unix attributes if stat.S_ISDIR(st_mode): zinfo.external_attr |= 0x10 # MS-DOS directory flag hashsum = hashlib.sha256() with open(full_path, 'rb') as src, self._wheel_zip.open(zinfo, 'w') as dst: while True: buf = src.read(1024 * 8) if not buf: break hashsum.update(buf) dst.write(buf) size = os.stat(full_path).st_size hash_digest = urlsafe_b64encode(hashsum.digest()).decode( 'ascii').rstrip('=') self._records.append((rel_path, hash_digest, size)) @contextlib.contextmanager def _write_to_zip(self, rel_path): sio = StringIO() yield sio # The default is a fixed timestamp rather than the current time, so # that building a wheel twice on the same computer can automatically # give you the exact same result. date_time = (2016, 1, 1, 0, 0, 0) zi = zipfile.ZipInfo(rel_path, date_time) b = sio.getvalue().encode('utf-8') hashsum = hashlib.sha256(b) hash_digest = urlsafe_b64encode( hashsum.digest() ).decode('ascii').rstrip('=') self._wheel_zip.writestr(zi, b, compress_type=zipfile.ZIP_DEFLATED) self._records.append((rel_path, hash_digest, len(b))) def _write_entry_points(self, fp): """ Write entry_points.txt. """ entry_points = self.convert_entry_points() for group_name in sorted(entry_points): fp.write('[{}]\n'.format(group_name)) for ep in sorted(entry_points[group_name]): fp.write(ep.replace(' ', '')) fp.write('\n') def _write_wheel_file(self, fp): fp.write(wheel_file_template) if self.supports_python2(): fp.write("Tag: py2-none-any\n") fp.write("Tag: py3-none-any\n") def _write_metadata_file(self, fp): """ Write out metadata in the 1.x format (email like) """ fp.write('Metadata-Version: 1.2\n') fp.write(f'Name: {self._package.name}\n') fp.write(f'Version: {self._package.version}\n') fp.write(f'Summary: {self._package.description}\n') fp.write(f'Home-page: {self._package.homepage or self._package.repository_url or "UNKNOWN"}\n') fp.write(f'License: {self._package.license or "UNKOWN"}\n') # Optional fields if self._package.keywords: fp.write(f"Keywords: {','.join(self._package.keywords)}\n") if self._package.authors: author = self.convert_author(self._package.authors[0]) fp.write(f'Author: {author["name"]}\n') fp.write(f'Author-email: {author["email"]}\n') if self._package.python_versions != '*': constraint = self._package.python_constraint if isinstance(constraint, MultiConstraint): python_requires = ','.join( [str(c).replace(' ', '') for c in constraint.constraints] ) else: python_requires = str(constraint).replace(' ', '') fp.write(f'Requires-Python: {python_requires}\n') classifiers = self.get_classifers() for classifier in classifiers: fp.write(f'Classifier: {classifier}\n') for dep in self._package.requires: fp.write('Requires-Dist: {}\n'.format(dep.to_pep_508())) if self._package.readme is not None: fp.write('\n' + self._package.readme + '\n') PKuhLspoetry/masonry/metadata.pyfrom poetry.semver.constraints import MultiConstraint from poetry.utils.helpers import canonicalize_name class Metadata: metadata_version = '1.2' # version 1.0 name = None version = None platforms = () supported_platforms = () summary = None description = None keywords = None home_page = None download_url = None author = None author_email = None license = None # version 1.1 classifiers = () requires = () provides = () obsoletes = () # version 1.2 maintainer = None maintainer_email = None requires_python = None requires_external = () requires_dist = () provides_dist = () obsoletes_dist = () project_urls = () @classmethod def from_package(cls, package) -> 'Metadata': meta = cls() meta.name = canonicalize_name(package.name) meta.version = package.version meta.summary = package.description meta.description = package.readme meta.keywords = ','.join(package.keywords) meta.home_page = package.homepage or package.repository_url meta.author = package.author_name meta.author_email = package.author_email meta.license = package.license meta.classifiers = package.classifiers # Version 1.2 meta.maintainer = meta.author meta.maintainer_email = meta.author_email meta.requires_python = package.python_constraint meta.requires_dist = [d.to_pep_508() for d in package.requires] # Requires python constraint = package.python_constraint if isinstance(constraint, MultiConstraint): python_requires = ','.join( [str(c).replace(' ', '') for c in constraint.constraints] ) else: python_requires = str(constraint).replace(' ', '') meta.requires_python = python_requires return meta PKUhL5k!!%poetry/masonry/publishing/__init__.pyfrom .publisher import Publisher PKhL>**&poetry/masonry/publishing/publisher.pyimport hashlib import io import re import requests import toml from pathlib import Path from requests import adapters from requests.exceptions import HTTPError from requests.packages.urllib3 import util from requests_toolbelt import user_agent from requests_toolbelt.multipart import ( MultipartEncoder, MultipartEncoderMonitor ) from poetry import __version__ from poetry.locations import CONFIG_DIR from ..metadata import Metadata wheel_file_re = re.compile( r"""^(?P(?P.+?)(-(?P\d.+?))?) ((-(?P\d.*?))?-(?P.+?)-(?P.+?)-(?P.+?) \.whl|\.dist-info)$""", re.VERBOSE ) KEYWORDS_TO_NOT_FLATTEN = {'gpg_signature', 'content'} class Publisher: """ Registers and publishes packages to remote repositories. """ def __init__(self, poetry, io): self._poetry = poetry self._package = poetry.package self._io = io def publish(self, repository_name): if repository_name: self._io.writeln( f'Publishing {self._package.pretty_name} ' f'({self._package.pretty_version}) ' f'to {repository_name}' ) else: self._io.writeln( f'Publishing {self._package.pretty_name} ' f'({self._package.pretty_version}) ' f'to PyPI' ) if not repository_name: url = 'https://upload.pypi.org/legacy/' repository_name = 'pypi' else: # Retrieving config information config_file = Path(CONFIG_DIR) / 'config.toml' if not config_file.exists(): raise RuntimeError( 'Config file does not exist. ' 'Unable to get repository information' ) with config_file.open() as f: config = toml.loads(f.read()) if ( 'repositories' not in config or repository_name not in config['repositories'] ): raise RuntimeError( f'Repository {repository_name} is not defined' ) url = config['repositories'][repository_name]['url'] username = None password = None auth_file = Path(CONFIG_DIR) / 'auth.toml' if auth_file.exists(): with auth_file.open() as f: auth_config = toml.loads(f.read()) if 'http-basic' in auth_config and repository_name in auth_config['http-basic']: config = auth_config['http-basic'][repository_name] username = config.get('username') password = config.get('password') # Requesting missing credentials if not username: username = self._io.ask('Username:') if not password: password = self._io.ask_hidden('Password:') session = requests.session() session.auth = (username, password) session.headers['User-Agent'] = self._make_user_agent_string() for scheme in ('http://', 'https://'): session.mount(scheme, self._make_adapter_with_retries()) # TODO: handle certificates try: self.upload(session, url) finally: session.close() def register(self, session, url): """ Register a package to a repository. """ dist = self._poetry.file.parent / 'dist' file = dist / f'{self._package.name}-{self._package.version}.tar.gz' if not file.exists(): raise RuntimeError( '"{0}" does not exist on the file system.'.format(file.name) ) data = self.post_data(file) data.update({ ":action": "submit", "protocol_version": "1", }) data_to_send = self._convert_data_to_list_of_tuples(data) encoder = MultipartEncoder(data_to_send) resp = session.post( url, data=encoder, allow_redirects=False, headers={'Content-Type': encoder.content_type}, ) return resp def upload(self, session, url): """ Upload packages for the current project. """ try: self._upload(session, url) except HTTPError as e: if ( e.response.status_code not in (403, 400) or e.response.status_code == 400 and 'was ever registered' not in e.response.text ): raise # It may be the first time we publish the package # We'll try to register it and go from there try: self.register(session, url) except HTTPError: raise def post_data(self, file): meta = Metadata.from_package(self._package) file_type = self._get_type(file) blake2_256_hash = hashlib.blake2b(digest_size=256 // 8) md5_hash = hashlib.md5() sha2_hash = hashlib.sha256() with file.open('rb') as fp: for content in iter(lambda: fp.read(io.DEFAULT_BUFFER_SIZE), b''): md5_hash.update(content) sha2_hash.update(content) blake2_256_hash.update(content) md5_digest = md5_hash.hexdigest() sha2_digest = sha2_hash.hexdigest() blake2_256_digest = blake2_256_hash.hexdigest() if file_type == 'bdist_wheel': wheel_info = wheel_file_re.match(file.name) py_version = wheel_info.group("pyver") else: py_version = None return { # identify release "name": meta.name, "version": meta.version, # file content "filetype": file_type, "pyversion": py_version, # additional meta-data "metadata_version": meta.metadata_version, "summary": meta.summary, "home_page": meta.home_page, "author": meta.author, "author_email": meta.author_email, "maintainer": meta.maintainer, "maintainer_email": meta.maintainer_email, "license": meta.license, "description": meta.description, "keywords": meta.keywords, "platform": meta.platforms, "classifiers": meta.classifiers, "download_url": meta.download_url, "supported_platform": meta.supported_platforms, "comment": None, "md5_digest": md5_digest, "sha256_digest": sha2_digest, "blake2_256_digest": blake2_256_digest, # PEP 314 "provides": meta.provides, "requires": meta.requires, "obsoletes": meta.obsoletes, # Metadata 1.2 "project_urls": meta.project_urls, "provides_dist": meta.provides_dist, "obsoletes_dist": meta.obsoletes_dist, "requires_dist": meta.requires_dist, "requires_external": meta.requires_external, "requires_python": meta.requires_python, } def _upload(self, session, url): dist = self._poetry.file.parent / 'dist' packages = dist.glob(f'{self._package.name}-{self._package.version}*') files = ( i for i in packages if ( i.match(f'{self._package.name}-{self._package.version}-*.whl') or i.match(f'{self._package.name}-{self._package.version}.tar.gz') ) ) for file in files: # TODO: Check existence resp = self._upload_file(session, url, file) # Bug 92. If we get a redirect we should abort because something seems # funky. The behaviour is not well defined and redirects being issued # by PyPI should never happen in reality. This should catch malicious # redirects as well. if resp.is_redirect: raise RuntimeError( ('"{0}" attempted to redirect to "{1}" during upload.' ' Aborting...').format(url, resp.headers["location"])) resp.raise_for_status() def _upload_file(self, session, url, file): data = self.post_data(file) data.update({ # action ":action": "file_upload", "protocol_version": "1", }) data_to_send = self._convert_data_to_list_of_tuples(data) with file.open('rb') as fp: data_to_send.append(( "content", (file.name, fp, "application/octet-stream"), )) encoder = MultipartEncoder(data_to_send) bar = self._io.create_progress_bar(encoder.len) bar.set_format( " - Uploading {0} %percent%%".format( file.name ) ) monitor = MultipartEncoderMonitor( encoder, lambda monitor: bar.set_progress(monitor.bytes_read) ) bar.start() resp = session.post( url, data=monitor, allow_redirects=False, headers={'Content-Type': monitor.content_type} ) if resp.ok: bar.finish() self._io.writeln('') else: self._io.overwrite('') return resp def _convert_data_to_list_of_tuples(self, data): data_to_send = [] for key, value in data.items(): if (key in KEYWORDS_TO_NOT_FLATTEN or not isinstance(value, (list, tuple))): data_to_send.append((key, value)) else: for item in value: data_to_send.append((key, item)) return data_to_send def _get_type(self, file): exts = file.suffixes if exts[-1] == '.whl': return 'bdist_wheel' elif len(exts) >= 2 and ''.join(exts[-2:]) == '.tar.gz': return 'sdist' raise ValueError( f'Unknown distribution format {"".join(exts)}' ) @staticmethod def _make_adapter_with_retries(): retry = util.Retry( connect=5, total=10, method_whitelist=['GET'], status_forcelist=[500, 501, 502, 503], ) return adapters.HTTPAdapter(max_retries=retry) @staticmethod def _make_user_agent_string(): return user_agent( 'twine', __version__, ) PK2QfL poetry/masonry/utils/__init__.pyPK2QfLƨC88poetry/masonry/utils/helpers.pydef normalize_file_permissions(st_mode): """ Normalizes the permission bits in the st_mode field from stat to 644/755 Popular VCSs only track whether a file is executable or not. The exact permissions can vary on systems with different umasks. Normalising to 644 (non executable) or 755 (executable) makes builds more reproducible. """ # Set 644 permissions, leaving higher bits of st_mode unchanged new_mode = (st_mode | 0o644) & ~0o133 if st_mode & 0o100: new_mode |= 0o111 # Executable: 644 -> 755 return new_mode PK2QfLD.gsspoetry/masonry/utils/module.pyfrom pathlib import Path from poetry.utils.helpers import module_name class Module: def __init__(self, name, directory='.'): self._name = module_name(name) # It must exist either as a .py file or a directory, but not both pkg_dir = Path(directory, self._name) py_file = Path(directory, self._name + '.py') if pkg_dir.is_dir() and py_file.is_file(): raise ValueError("Both {} and {} exist".format(pkg_dir, py_file)) elif pkg_dir.is_dir(): self._path = pkg_dir self._is_package = True elif py_file.is_file(): self._path = py_file self._is_package = False else: raise ValueError("No file/folder found for package {}".format(name)) @property def name(self) -> str: return self._name @property def path(self) -> Path: return self._path @property def file(self) -> Path: if self._is_package: return self._path / '__init__.py' else: return self._path def is_package(self) -> bool: return self._is_package PKzWL;8xMMpoetry/mixology/__init__.pyfrom .dependency_graph import DependencyGraph from .resolver import Resolver PK str: return 'user-specified dependency' @property def name_for_locking_dependency_source(self) -> str: return 'Lockfile' def search_for(self, dependency: Any) -> List[Any]: """ Search for the specifications that match the given dependency. The specifications in the returned list will be considered in reverse order, so the latest version ought to be last. """ return [] def dependencies_for(self, specification: Any) -> List[Any]: """ Returns the dependencies of specification. """ return [] def is_requirement_satisfied_by(self, requirement: Any, activated: DependencyGraph, spec: Any) -> bool: """ Determines whether the given requirement is satisfied by the given spec, in the context of the current activated dependency graph. """ return True def name_for(self, dependency: Any) -> str: """ Returns the name for the given dependency. """ return str(dependency) def sort_dependencies(self, dependencies: List[Any], activated: DependencyGraph, conflicts: Dict[str, List[Conflict]]) -> List[Any]: """ Sort dependencies so that the ones that are easiest to resolve are first. Easiest to resolve is (usually) defined by: 1) Is this dependency already activated? 2) How relaxed are the requirements? 3) Are there any conflicts for this dependency? 4) How many possibilities are there to satisfy this dependency? """ return sorted( dependencies, key=lambda dep: ( activated.vertex_named(self.name_for(dep)).payload is None, conflicts.get(self.name_for(dep) is None) ) ) def allow_missing(self, dependency) -> bool: """ Returns whether this dependency, which has no possible matching specifications, can safely be ignored. """ return False PKݮYL"-WWpoetry/mixology/contracts/ui.pyimport sys class UI: def __init__(self, debug=False): self._debug = debug @property def output(self): return sys.stdout @property def progress_rate(self) -> float: return 0.33 def is_debugging(self) -> bool: return self._debug def indicate_progress(self) -> None: self.output.write('.') def before_resolution(self) -> None: self.output.write('Resolving dependencies...\n') def after_resolution(self) -> None: self.output.write('') def debug(self, message, depth) -> None: if self.is_debugging(): debug_info = str(message) debug_info = '\n'.join([ ':{}: {}'.format(str(depth).rjust(4), s) for s in debug_info.split('\n') ]) + '\n' self.output.write(debug_info) PKTZL%7  #poetry/mixology/dependency_graph.pyfrom .exceptions import CircularDependencyError from .graph.log import Log class DependencyGraph: def __init__(self): self._vertices = {} self._log = Log() @property def vertices(self): return self._vertices @property def log(self): return self._log def tag(self, tag): return self._log.tag(self, tag) def rewind_to(self, tag): return self._log.rewind_to(self, tag) def add_child_vertex(self, name, payload, parent_names, requirement): root = True try: parent_names.index(None) except ValueError: root = False parent_names = [n for n in parent_names if n is not None] vertex = self.add_vertex(name, payload, root) if root: vertex.explicit_requirements.append(requirement) for parent_name in parent_names: parent_vertex = self.vertex_named(parent_name) self.add_edge(parent_vertex, vertex, requirement) return vertex def add_vertex(self, name, payload, root=False): return self._log.add_vertex(self, name, payload, root) def detach_vertex_named(self, name): return self._log.detach_vertex_named(self, name) def vertex_named(self, name): return self.vertices.get(name) def root_vertex_named(self, name): vertex = self.vertex_named(name) if vertex and vertex.root: return vertex def add_edge(self, origin, destination, requirement): if destination.has_path_to(origin): raise CircularDependencyError([origin, destination]) return self.add_edge_no_circular(origin, destination, requirement) def add_edge_no_circular(self, origin, destination, requirement): self._log.add_edge_no_circular( self, origin.name, destination.name, requirement ) def delete_edge(self, edge): return self._log.delete_edge( self, edge.origin.name, edge.destination.name, edge.requirement ) def set_payload(self, name, payload): return self._log.set_payload(self, name, payload) def to_dot(self): dot_vertices = [] dot_edges = [] for n, v in self.vertices.items(): dot_vertices.append( ' {} [label="{}|{}"]'.format(n, n, v.payload or '') ) for e in v.outgoing_edges: label = e.requirement dot_edges.append( ' {} -> {} [label="{}"]'.format( e.origin.name, e.destination.name, label ) ) dot_vertices = sorted(set(dot_vertices)) dot_edges = sorted(set(dot_edges)) dot_vertices.insert(0, 'digraph G {') dot_vertices.append('') dot_edges.append('}') dot = dot_vertices + dot_edges return '\n'.join(dot) def __iter__(self): return iter(self.vertices.values()) PKYLV[ poetry/mixology/exceptions.pyfrom .helpers import flat_map class ResolverError(Exception): pass class NoSuchDependencyError(ResolverError): def __init__(self, dependency, required_by=None): if required_by is None: required_by = [] sources = ' and '.join(['"{}"'.format(r) for r in required_by]) message = 'Unable to find a specification for "{}"'.format(dependency) if sources: message += ' depended upon by {}'.format(sources) super().__init__(message) class CircularDependencyError(ResolverError): def __init__(self, vertices): super(CircularDependencyError, self).__init__( 'There is a circular dependency between {}'.format( ' and '.join([v.name for v in vertices]) ) ) self._dependencies = [v.payload.possibilities[-1] for v in vertices] @property def dependencies(self): return self._dependencies class VersionConflict(ResolverError): def __init__(self, conflicts, specification_provider): pairs = [] for conflicting in flat_map( list(conflicts.values()), lambda x: x.requirements ): for source, conflict_requirements in conflicting.items(): for c in conflict_requirements: pairs.append((c, source)) super().__init__( 'Unable to satisfy the following requirements:\n\n' '{}'.format( '\n'.join('- "{}" required by "{}"'.format(r, d) for r, d in pairs) ) ) self._conflicts = conflicts self._specification_provider = specification_provider @property def conflicts(self): return self._conflicts @property def specification_provider(self): return self._specification_provider def message_with_trees(self, solver_name='Poetry', possibility_type='possibility named', reduce_trees=lambda trees: sorted(set(trees), key=str), printable_requirement=str, message_for_conflict=None, version_for_spec=str): o = [] for name, conflict in sorted(self._conflicts): o.append( '\n{} could not find compatible versions for {} "{}"_n'.format( solver_name, possibility_type, name ) ) if conflict.locked_requirement: o.append( ' In snapshot ({}):\n'.format( self._specification_provider.name_for_locking_dependency_source ) ) o.append( ' {}\n'.format( printable_requirement(conflict.locked_requirement) ) ) o.append('\n') o.append( ' In {}:\n'.format( self._specification_provider.name_for_explicit_dependency_source ) ) trees = reduce_trees(conflict.requirement_trees) ot = [] for tree in trees: t = '' depth = 2 for req in tree: t += ' ' * depth + str(req) if tree[-1] != req: spec = conflict.activated_by_name.get( self._specification_provider.name_for(req) ) if spec: t += ' was resolved to {}, which'.format( version_for_spec(spec) ) t += ' depends on' t += '\n' depth += 1 ot.append(t) o.append('\n'.join(ot)) if message_for_conflict: message_for_conflict(o, name, conflict) return ''.join(o).strip() PKVL!poetry/mixology/graph/__init__.pyPKXLKƢpoetry/mixology/graph/action.pyfrom typing import Any class Action: def __init__(self): self.previous = None self.next = None @property def action_name(self) -> str: raise NotImplementedError() def up(self, graph: 'DependencyGraph') -> Any: """ Performs the action on the given graph. """ raise NotImplementedError() def down(self, graph: 'DependencyGraph') -> None: """ Reverses the action on the given graph. """ raise NotImplementedError() PK1_ZLOwzz-poetry/mixology/graph/add_edge_no_circular.pyfrom .action import Action from .edge import Edge class AddEdgeNoCircular(Action): def __init__(self, origin, destination, requirement): super(AddEdgeNoCircular, self).__init__() self._origin = origin self._destination = destination self._requirement = requirement @property def action_name(self): return 'add_edge_no_circular' @property def origin(self): return self._origin @property def destination(self): return self._destination @property def requirement(self): return self._requirement def up(self, graph): edge = self.make_edge(graph) edge.origin.outgoing_edges.append(edge) edge.destination.incoming_edges.append(edge) return edge def down(self, graph): edge = self.make_edge(graph) self._delete_first(edge.origin.outgoing_edges, edge) self._delete_first(edge.destination.incoming_edges, edge) def make_edge(self, graph): return Edge( graph.vertex_named(self._origin), graph.vertex_named(self._destination), self._requirement ) def _delete_first(self, elements, element): """ :type elements: list """ try: index = elements.index(element) except ValueError: return del elements[index] PKYLeDD#poetry/mixology/graph/add_vertex.pyfrom .action import Action from .vertex import Vertex _NULL = object() class AddVertex(Action): def __init__(self, name, payload, root): """ :param name: The name of the vertex. :type name: str :param payload: The payload of he vertex :type payload: Any :param root: whether the vertex is root or not :type root: bool """ super(AddVertex, self).__init__() self._name = name self._payload = payload self._root = root self._existing_payload = _NULL self._existing_root = None @property def action_name(self): return 'add_vertex' @property def name(self): return self._name @property def payload(self): return self._payload @property def root(self): return self._root def up(self, graph): existing = graph.vertices.get(self._name) if existing: self._existing_payload = existing.payload self._existing_root = existing.root vertex = existing or Vertex(self._name, self._payload) graph.vertices[vertex.name] = vertex if not vertex.payload: vertex.payload = self.payload if not vertex.root: vertex.root = self.root return vertex def down(self, graph): if self._existing_payload is not _NULL: vertex = graph.vertices[self._name] vertex.payload = self._existing_payload vertex.root = self._existing_root else: del graph.vertices[self._name] PKXZLj^^$poetry/mixology/graph/delete_edge.pyfrom .action import Action from .edge import Edge class DeleteEdge(Action): def __init__(self, origin, destination, requirement): super(DeleteEdge, self).__init__() self._origin = origin self._destination = destination self._requirement = requirement @property def action_name(self): return 'delete_edge' @property def origin(self): return self._origin @property def destination(self): return self._destination @property def requirement(self): return self._requirement def up(self, graph): edge = self.make_edge(graph) self._delete_first(edge.origin.outgoing_edges, edge) self._delete_first(edge.destination.incoming_edges, edge) return edge def down(self, graph): edge = self.make_edge(graph) edge.origin.outgoing_edges.append(edge) edge.origin.incoming_edges.append(edge) def make_edge(self, graph): return Edge( graph.vertex_named(self._origin), graph.vertex_named(self._destination), self._requirement ) def _delete_first(self, elements, element): """ :type elements: list """ try: index = elements.index(element) except ValueError: return del elements[index] PK cL͍ ,poetry/mixology/graph/detach_vertex_named.pyfrom .action import Action class DetachVertexNamed(Action): def __init__(self, name): super(DetachVertexNamed, self).__init__() self._name = name self._vertex = None @property def action_name(self): return 'detach_vertex' @property def name(self): return self._name def up(self, graph): if self._name not in graph.vertices: return [] self._vertex = graph.vertices[self._name] del graph.vertices[self._name] removed_vertices = [self._vertex] for e in self._vertex.outgoing_edges: v = e.destination try: v.incoming_edges.remove(e) except ValueError: pass if not v.root and not v.incoming_edges: removed_vertices += graph.detach_vertex_named(v.name) for e in self._vertex.incoming_edges: v = e.origin try: v.outgoing_edges.remove(e) except ValueError: pass return removed_vertices def down(self, graph): if self._vertex is None: return graph.vertices[self._vertex.name] = self._vertex for e in self._vertex.outgoing_edges: e.destination.incoming_edges.append(e) for e in self._vertex.incoming_edges: e.origin.outgoing_edges.append(e) PKhXZLNpoetry/mixology/graph/edge.pyclass Edge: """ A directed edge of a DependencyGraph """ def __init__(self, origin, destination, requirement): self._origin = origin self._destination = destination self._requirement = requirement @property def origin(self): return self._origin @property def destination(self): return self._destination @property def requirement(self): return self._requirement def __eq__(self, other): return self._origin == other.origin and self._destination == other.destination def __repr__(self): return ' {}>'.format( self._origin.name, self._destination.name ) PKiWZLT0 0 poetry/mixology/graph/log.pyfrom .add_edge_no_circular import AddEdgeNoCircular from .add_vertex import AddVertex from .delete_edge import DeleteEdge from .detach_vertex_named import DetachVertexNamed from .set_payload import SetPayload from .tag import Tag class Log: """ A log for dependency graph actions. """ def __init__(self): self._current_action = None self._first_action = None def tag(self, graph, tag): """ Tags the current state of the dependency as the given tag. """ return self._push_action(graph, Tag(tag)) def add_vertex(self, graph, name, payload, root): return self._push_action(graph, AddVertex(name, payload, root)) def detach_vertex_named(self, graph, name): return self._push_action(graph, DetachVertexNamed(name)) def add_edge_no_circular(self, graph, origin, destination, requirement): action = AddEdgeNoCircular(origin, destination, requirement) return self._push_action(graph, action) def delete_edge(self, graph, origin, destination, requirement): action = DeleteEdge(origin, destination, requirement) return self._push_action(graph, action) def set_payload(self, graph, name, payload): return self._push_action(graph, SetPayload(name, payload)) def pop(self, graph): action = self._current_action if not action: return self._current_action = action.previous if not self._current_action: self._first_action = None action.down(graph) return action def rewind_to(self, graph, tag): while True: action = self.pop(graph) if not action: raise ValueError('No tag "{}" found'.format(tag)) if isinstance(action, Tag) and action.tag == tag: break def _push_action(self, graph, action): """ Adds the given action to the log, running the action :param graph: The graph :param action: The action :type action: Action """ action.previous = self._current_action if self._current_action: self._current_action.next = action self._current_action = action if not self._first_action: self._first_action = action return action.up(graph) PKWL$poetry/mixology/graph/set_payload.pyfrom .action import Action class SetPayload(Action): def __init__(self, name, payload): super(SetPayload, self).__init__() self._name = name self._payload = payload self._old_payload = None @property def action_name(self): return 'set_payload' @property def name(self): return self._name @property def payload(self): return self._payload def up(self, graph): vertex = graph.vertex_named(self._name) self._old_payload = vertex.payload vertex.payload = self._payload def down(self, graph): graph.vertex_named(self._name).payload = self._old_payload PKVLlZ9UUpoetry/mixology/graph/tag.pyfrom .action import Action class Tag(Action): def __init__(self, tag): super(Tag, self).__init__() self._tag = tag @property def action_name(self): return 'tag' @property def tag(self): return self._tag def up(self, graph): pass def down(self, graph): pass PKYZLڀ_ _ poetry/mixology/graph/vertex.pyfrom ..utils import unique class Vertex: def __init__(self, name, payload): self.name = name self.payload = payload self.root = False self._explicit_requirements = [] self.outgoing_edges = [] self.incoming_edges = [] @property def explicit_requirements(self): return self._explicit_requirements @property def requirements(self): return unique([ edge.requirement for edge in self.incoming_edges ] + self._explicit_requirements) @property def predecessors(self): return [edge.origin for edge in self.incoming_edges] @property def recursive_predecessors(self): return self._recursive_predecessors() def _recursive_predecessors(self, vertices=None): if vertices is None: vertices = set() for edge in self.incoming_edges: vertex = edge.origin if vertex in vertices: continue vertices.add(vertex) vertex._recursive_predecessors(vertices) return vertices @property def successors(self): return [ edge.destination for edge in self.outgoing_edges ] @property def recursive_successors(self): return self._recursive_successors() def _recursive_successors(self, vertices=None): if vertices is None: vertices = set() for edge in self.outgoing_edges: vertex = edge.destination if vertex in vertices: continue vertices.add(vertex) vertex._recursive_successors(vertices) return vertices def __eq__(self, other): if not isinstance(other, Vertex): return NotImplemented if self is other: return True return ( self.name == other.name and self.payload == other.payload and set(self.successors) == set(other.successors) ) def __hash__(self): return hash(self.name) def has_path_to(self, other): return ( self == other or any([v.has_path_to(other) for v in self.successors]) ) def is_ancestor(self, other): return other.path_to(self) def __repr__(self): return ''.format(self.name, self.payload) PKTWLC_Npoetry/mixology/helpers.pydef flat_map(iter, callable): if not isinstance(iter, (list, tuple)): yield callable(iter) else: for v in iter: for i in flat_map(v, callable): yield i PKTZLp"poetry/mixology/possibility_set.pyclass PossibilitySet: def __init__(self, dependencies, possibilities): self.dependencies = dependencies self.possibilities = possibilities @property def latest_version(self): if self.possibilities: return self.possibilities[-1] def __str__(self): return '[{}]'.format(', '.join([str(p) for p in self.possibilities])) def __repr__(self): return f'' PKC_ZLmp~~poetry/mixology/resolution.pyimport logging from copy import copy from datetime import datetime from typing import Any from typing import List from .contracts import SpecificationProvider from .contracts import UI from .exceptions import CircularDependencyError from .exceptions import VersionConflict from .conflict import Conflict from .dependency_graph import DependencyGraph from .helpers import flat_map from .possibility_set import PossibilitySet from .state import DependencyState from .state import ResolutionState from .unwind_details import UnwindDetails from .utils import unique logger = logging.getLogger(__name__) class Resolution: def __init__(self, provider: SpecificationProvider, ui: UI, requested: List[Any], base: DependencyGraph): self._provider = provider self._ui = ui self._requested = requested self._original_requested = copy(requested) self._base = base self._states = [] self._iteration_counter = 0 self._progress_rate = 0.33 self._iteration_rate = None self._parents_of = {} self._started_at = None @property def provider(self) -> SpecificationProvider: return self._provider @property def ui(self) -> UI: return self._ui @property def requested(self) -> List[Any]: return self._requested @property def base(self) -> DependencyGraph: return self._base @property def activated(self) -> DependencyGraph: return self.state.activated def resolve(self) -> DependencyGraph: """ Resolve the original requested dependencies into a full dependency graph. """ self._start() try: while self.state: if not self.state.requirement and not self.state.requirements: break self._indicate_progress() if hasattr(self.state, 'pop_possibility_state'): self._debug( f'Creating possibility state for ' f'{str(self.state.requirement)} ' f'({len(self.state.possibilities)} remaining)' ) s = self.state.pop_possibility_state() if s: self._states.append(s) self.activated.tag(s) self._process_topmost_state() return self._resolve_activated_specs() finally: self._end() def _start(self) -> None: """ Set up the resolution process. """ self._started_at = datetime.now() self._debug( f'Starting resolution ({self._started_at})\n' f'Requested dependencies: ' f'{[str(d) for d in self._original_requested]}' ) self._ui.before_resolution() self._handle_missing_or_push_dependency_state(self._initial_state()) def _resolve_activated_specs(self) -> DependencyGraph: for vertex in self.activated.vertices.values(): if not vertex.payload: continue latest_version = None for possibility in reversed(list(vertex.payload.possibilities)): if all( [ self._provider.is_requirement_satisfied_by( req, self.activated, possibility ) for req in vertex.requirements ] ): latest_version = possibility break self.activated.set_payload(vertex.name, latest_version) return self.activated def _end(self) -> None: """ Ends the resolution process """ elapsed = (datetime.now() - self._started_at).total_seconds() self._ui.after_resolution() self._debug( f'Finished resolution ({self._iteration_counter} steps) ' f'in {elapsed:.3f} seconds' ) def _process_topmost_state(self) -> None: """ Processes the topmost available RequirementState on the stack. """ try: if self.possibility: self._attempt_to_activate() else: self._create_conflict() self._unwind_for_conflict() except CircularDependencyError as e: self._create_conflict(e) self._unwind_for_conflict() @property def possibility(self) -> PossibilitySet: """ The current possibility that the resolution is trying. """ if self.state.possibilities: return self.state.possibilities[-1] @property def state(self) -> DependencyState: """ The current state the resolution is operating upon. """ if self._states: return self._states[-1] @property def name(self) -> str: return self.state.name @property def requirement(self) -> Any: return self.state.requirement def _initial_state(self) -> DependencyState: """ Create the initial state for the resolution, based upon the requested dependencies. """ graph = DependencyGraph() for requested in self._original_requested: vertex = graph.add_vertex( self._provider.name_for(requested), None, True ) vertex.explicit_requirements.append(requested) graph.tag('initial_state') requirements = self._provider.sort_dependencies( self._original_requested, graph, {} ) initial_requirement = None if requirements: initial_requirement = requirements.pop(0) name = None if initial_requirement: name = self._provider.name_for(initial_requirement) return DependencyState( name, requirements, graph, initial_requirement, self._possibilities_for_requirement(initial_requirement, graph), 0, {}, [] ) def _unwind_for_conflict(self) -> None: """ Unwinds the states stack because a conflict has been encountered """ details_for_unwind = self._build_details_for_unwind() unwind_options = self.state.unused_unwind_options self._debug( 'Unwinding for conflict: ' '{} to {}'.format( str(self.state.requirement), details_for_unwind.state_index // 2 ), self.state.depth ) conflicts = self.state.conflicts sliced_states = self._states[details_for_unwind.state_index + 1:] self._states = self._states[:details_for_unwind.state_index + 1] self._raise_error_unless_state(conflicts) if sliced_states: self.activated.rewind_to( sliced_states[0] or 'initial_state' ) self.state.conflicts = conflicts self.state.unused_unwind_options = unwind_options self._filter_possibilities_after_unwind(details_for_unwind) index = len(self._states) - 1 for k, l in self._parents_of.items(): self._parents_of[k] = [x for x in l if x < index] self.state.unused_unwind_options = [ uw for uw in self.state.unused_unwind_options if uw.state_index < index ] def _raise_error_unless_state(self, conflicts) -> None: """ Raise a VersionConflict error, or any underlying error, if there is no current state """ if self.state: return errors = [c.underlying_error for c in conflicts.values() if c.underlying_error is not None] if errors: error = errors[0] else: error = VersionConflict(conflicts, self._provider) raise error def _build_details_for_unwind(self) -> UnwindDetails: """ Return the details of the nearest index to which we could unwind. """ # Get the possible unwinds for the current conflict current_conflict = self.state.conflicts[self.state.name] binding_requirements = self._binding_requirements_for_conflict( current_conflict ) unwind_details = self._unwind_options_for_requirements( binding_requirements ) last_detail_for_current_unwind = sorted(unwind_details)[-1] current_detail = last_detail_for_current_unwind # Look for past conflicts that could be unwound to affect the # requirement tree for the current conflict relevant_unused_unwinds = [] for alternative in self.state.unused_unwind_options: intersecting_requirements = ( set(last_detail_for_current_unwind.all_requirements) & set(alternative.requirements_unwound_to_instead) ) if not intersecting_requirements: continue # Find the highest index unwind whilst looping through if alternative > current_detail: current_detail = alternative relevant_unused_unwinds.append(alternative) # Add the current unwind options to the `unused_unwind_options` array. # The "used" option will be filtered out during `unwind_for_conflict`. self.state.unused_unwind_options += [ detail for detail in unwind_details if detail.state_index != -1 ] # Update the requirements_unwound # to_instead on any relevant unused unwinds for d in relevant_unused_unwinds: d.requirements_unwound_to_instead.append( current_detail.state_requirement ) for d in unwind_details: d.requirements_unwound_to_instead.append( current_detail.state_requirement ) return current_detail def _unwind_options_for_requirements(self, binding_requirements): unwind_details = [] trees = [] for r in reversed(binding_requirements): partial_tree = [r] trees.append(partial_tree) unwind_details.append( UnwindDetails( -1, None, partial_tree, binding_requirements, trees, [] ) ) # If this requirement has alternative possibilities, # check if any would satisfy the other requirements # that created this conflict requirement_state = self._find_state_for(r) if self._conflict_fixing_possibilities(requirement_state, binding_requirements): unwind_details.append( UnwindDetails( self._states.index(requirement_state), r, partial_tree, binding_requirements, trees, [] ) ) # Next, look at the parent of this requirement, # and check if the requirement could have been avoided # if an alternative PossibilitySet had been chosen parent_r = self._parent_of(r) if parent_r is None: continue partial_tree.insert(0, parent_r) requirement_state = self._find_state_for(parent_r) possibilities = [ r.name in map(lambda x: x.name, set_.dependencies) for set_ in requirement_state.possibilities ] if any(possibilities): unwind_details.append( UnwindDetails( self._states.index(requirement_state), parent_r, partial_tree, binding_requirements, trees, [] ) ) # Finally, look at the grandparent and up of this requirement, # looking for any possibilities that wouldn't # create their parent requirement grandparent_r = self._parent_of(parent_r) while grandparent_r is not None: partial_tree.insert(0, grandparent_r) requirement_state = self._find_state_for(grandparent_r) possibilities = [ parent_r.name in map(lambda x: x.name, set_.dependencies) for set_ in requirement_state.possibilities ] if any(possibilities): unwind_details.append( UnwindDetails( self._states.index(requirement_state), grandparent_r, partial_tree, binding_requirements, trees, [] ) ) parent_r = grandparent_r grandparent_r = self._parent_of(parent_r) return unwind_details def _conflict_fixing_possibilities(self, state, binding_requirements): """ Return whether or not the given state has any possibilities that could satisfy the given requirements :rtype: bool """ if not state: return False return any([ any([ self._possibility_satisfies_requirements( poss, binding_requirements ) ]) for possibility_set in state.possibilities for poss in possibility_set.possibilities ]) def _filter_possibilities_after_unwind(self, unwind_details): """ Filter a state's possibilities to remove any that would not fix the conflict we've just rewound from :type unwind_details: UnwindDetails """ if not self.state or not self.state.possibilities: return if unwind_details.unwinding_to_primary_requirement(): self._filter_possibilities_for_primary_unwind(unwind_details) else: self._filter_possibilities_for_parent_unwind(unwind_details) def _filter_possibilities_for_primary_unwind(self, unwind_details): """ Filter a state's possibilities to remove any that would not satisfy the requirements in the conflict we've just rewound from. :type unwind_details: UnwindDetails """ unwinds_to_state = [ uw for uw in self.state.unused_unwind_options if uw.state_index == unwind_details.state_index ] unwinds_to_state.append(unwind_details) unwind_requirement_sets = [ uw.conflicting_requirements for uw in unwinds_to_state ] possibilities = [] for possibility_set in self.state.possibilities: if not any([ any([ self._possibility_satisfies_requirements( poss, requirements ) ]) for poss in possibility_set.possibilities for requirements in unwind_requirement_sets ]): continue possibilities.append(possibility_set) self.state.possibilities = possibilities def _possibility_satisfies_requirements(self, possibility, requirements): name = self._provider.name_for(possibility) self.activated.tag('swap') if self.activated.vertex_named(name): self.activated.set_payload(name, possibility) satisfied = all([ self._provider.is_requirement_satisfied_by( r, self.activated, possibility ) for r in requirements ]) self.activated.rewind_to('swap') return satisfied def _filter_possibilities_for_parent_unwind(self, unwind_details: UnwindDetails): """ Filter a state's possibilities to remove any that would (eventually) the requirements in the conflict we've just rewound from. """ unwinds_to_state = [ uw for uw in self.state.unused_unwind_options if uw.state_index == unwind_details.state_index ] unwinds_to_state.append(unwind_details) primary_unwinds = unique([ uw for uw in unwinds_to_state if uw.unwinding_to_primary_requirement() ]) parent_unwinds = unique(unwinds_to_state) parent_unwinds = [uw for uw in parent_unwinds if uw not in primary_unwinds] allowed_possibility_sets = [] for unwind in primary_unwinds: for possibility_set in self._states[unwind.state_index].possibilities: if any([ self._possibility_satisfies_requirements( poss, unwind.conflicting_requirements ) for poss in possibility_set.possibilities ]): allowed_possibility_sets.append(possibility_set) requirements_to_avoid = list(flat_map( parent_unwinds, lambda x: x.sub_dependencies_to_avoid )) possibilities = [] for possibility_set in self.state.possibilities: if ( possibility_set in allowed_possibility_sets or [ r for r in requirements_to_avoid if r not in possibility_set.dependencies ] ): possibilities.append(possibility_set) self.state.possibilities = possibilities def _binding_requirements_for_conflict(self, conflict): """ Return the minimal list of requirements that would cause the passed conflict to occur. :rtype: list """ if conflict.possibility is None: return [conflict.requirement] possible_binding_requirements_set = list(conflict.requirements.values()) possible_binding_requirements = [] for reqs in possible_binding_requirements_set: if isinstance(reqs, list): possible_binding_requirements += reqs else: possible_binding_requirements.append(reqs) possible_binding_requirements = unique(possible_binding_requirements) # When there’s a `CircularDependency` error the conflicting requirement # (the one causing the circular) won’t be `conflict.requirement` # (which won’t be for the right state, because we won’t have created it, # because it’s circular). # We need to make sure we have that requirement in the conflict’s list, # otherwise we won’t be able to unwind properly, so we just return all # the requirements for the conflict. if conflict.underlying_error: return possible_binding_requirements possibilities = self._provider.search_for(conflict.requirement) # If all the requirements together don't filter out all possibilities, # then the only two requirements we need to consider are the initial one # (where the dependency's version was first chosen) and the last if self._binding_requirement_in_set( None, possible_binding_requirements, possibilities ): return list(filter(None, [ conflict.requirement, self._requirement_for_existing_name( self._provider.name_for(conflict.requirement) ) ])) # Loop through the possible binding requirements, removing each one # that doesn't bind. Use a reversed as we want the earliest set of # binding requirements. binding_requirements = copy(possible_binding_requirements) for req in reversed(possible_binding_requirements): if req == conflict.requirement: continue if not self._binding_requirement_in_set( req, binding_requirements, possibilities ): index = binding_requirements.index(req) del binding_requirements[index] return binding_requirements def _binding_requirement_in_set(self, requirement, possible_binding_requirements, possibilities) -> bool: """ Return whether or not the given requirement is required to filter out all elements of the list of possibilities. """ return any([ self._possibility_satisfies_requirements( poss, set(possible_binding_requirements) - set([requirement]) ) for poss in possibilities ]) def _parent_of(self, requirement): if not requirement: return if requirement not in self._parents_of: self._parents_of[requirement] = [] if not self._parents_of[requirement]: return try: index = self._parents_of[requirement][-1] except ValueError: return try: parent_state = self._states[index] except ValueError: return return parent_state.requirement def _requirement_for_existing_name(self, name): vertex = self.activated.vertex_named(name) if not vertex: return if not vertex.payload: return for s in self._states: if s.name == name: return s.requirement def _find_state_for(self, requirement): if not requirement: return for s in self._states: if s.requirement == requirement: return s def _create_conflict(self, underlying_error=None): vertex = self.activated.vertex_named(self.state.name) locked_requirement = self._locked_requirement_named(self.state.name) requirements = {} if vertex.explicit_requirements: requirements[self._provider.name_for_explicit_dependency_source] = vertex.explicit_requirements if locked_requirement: requirements[self._provider.name_for_locking_dependency_source] = [locked_requirement] for edge in vertex.incoming_edges: if edge.origin.payload.latest_version not in requirements: requirements[edge.origin.payload.latest_version] = [] requirements[edge.origin.payload.latest_version].insert(0, edge.requirement) activated_by_name = {} for v in self.activated: if v.payload: activated_by_name[v.name] = v.payload.latest_version conflict = Conflict( self.requirement, requirements, vertex.payload.latest_version if vertex.payload else None, self.possibility, locked_requirement, self.requirement_trees, activated_by_name, underlying_error ) self.state.conflicts[self.name] = conflict return conflict @property def requirement_trees(self): vertex = self.activated.vertex_named(self.state.name) return [self._requirement_tree_for(r) for r in vertex.requirements] def _requirement_tree_for(self, requirement): tree = [] while requirement: tree.insert(0, requirement) requirement = self._parent_of(requirement) return tree def _indicate_progress(self): self._iteration_counter += 1 progress_rate = self._ui.progress_rate or self._progress_rate if self._iteration_rate is None: if (datetime.now() - self._started_at).total_seconds() >= progress_rate: self._iteration_rate = self._iteration_counter if self._iteration_rate and (self._iteration_counter % self._iteration_rate) == 0: self._ui.indicate_progress() def _debug(self, message, depth=0): self._ui.debug(message, depth) def _attempt_to_activate(self): self._debug( f'Attempting to activate {str(self.possibility)}', self.state.depth, ) existing_vertex = self.activated.vertex_named(self.state.name) if existing_vertex.payload: self._debug( 'Found existing spec ({})'.format(existing_vertex.payload), self.state.depth ) self._attempt_to_filter_existing_spec(existing_vertex) else: latest = self.possibility.latest_version possibilities = [] for possibility in self.possibility.possibilities: if self._provider.is_requirement_satisfied_by( self.requirement, self.activated, possibility ): possibilities.append(possibility) self.possibility.possibilities = possibilities if self.possibility.latest_version is None: # ensure there's a possibility for better error messages if latest: self.possibility.possibilities.append(latest) self._create_conflict() self._unwind_for_conflict() else: self._activate_new_spec() def _attempt_to_filter_existing_spec(self, vertex): """ Attempt to update the existing vertex's `PossibilitySet` with a filtered version. """ filtered_set = self._filtered_possibility_set(vertex) if filtered_set.possibilities: self.activated.set_payload(self.name, filtered_set) new_requirements = copy(self.state.requirements) self._push_state_for_requirements(new_requirements, False) else: self._create_conflict() self._debug( f'Unsatisfied by existing spec ({str(vertex.payload)})', self.state.depth ) self._unwind_for_conflict() def _filtered_possibility_set(self, vertex): possibilities = [ p for p in vertex.payload.possibilities if p in self.possibility.possibilities ] return PossibilitySet( vertex.payload.dependencies, possibilities ) def _locked_requirement_named(self, requirement_name): vertex = self.base.vertex_named(requirement_name) if vertex: return vertex.payload def _activate_new_spec(self): if self.state.name in self.state.conflicts: del self.state.conflicts[self.name] self._debug( f'Activated {self.state.name} at {str(self.possibility)}', self.state.depth ) self.activated.set_payload(self.state.name, self.possibility) self._require_nested_dependencies_for(self.possibility) def _require_nested_dependencies_for(self, possibility_set): nested_dependencies = self._provider.dependencies_for( possibility_set.latest_version ) self._debug( f'Requiring nested dependencies ' f'({", ".join([str(d) for d in nested_dependencies])})', self.state.depth ) for d in nested_dependencies: self.activated.add_child_vertex( self._provider.name_for(d), None, [self._provider.name_for(possibility_set.latest_version)], d ) parent_index = len(self._states) - 1 if d not in self._parents_of: self._parents_of[d] = [] parents = self._parents_of[d] if not parents: parents.append(parent_index) self._push_state_for_requirements( self.state.requirements + nested_dependencies, len(nested_dependencies) > 0 ) def _push_state_for_requirements(self, new_requirements, requires_sort=True, new_activated=None): if new_activated is None: new_activated = self.activated if requires_sort: new_requirements = self._provider.sort_dependencies( unique(new_requirements), new_activated, self.state.conflicts ) while True: new_requirement = None if new_requirements: new_requirement = new_requirements.pop(0) if ( new_requirement is None or not any([ s.requirement == new_requirement for s in self._states ]) ): break new_name = '' if new_requirement: new_name = self._provider.name_for(new_requirement) possibilities = self._possibilities_for_requirement(new_requirement) self._handle_missing_or_push_dependency_state( DependencyState( new_name, new_requirements, new_activated, new_requirement, possibilities, self.state.depth, copy(self.state.conflicts), copy(self.state.unused_unwind_options) ) ) def _possibilities_for_requirement(self, requirement, activated=None): if activated is None: activated = self.activated if not requirement: return [] if self._locked_requirement_named(self._provider.name_for(requirement)): return self._locked_requirement_possibility_set( requirement, activated ) return self._group_possibilities( self._provider.search_for(requirement) ) def _locked_requirement_possibility_set(self, requirement, activated=None): if activated is None: activated = self.activated all_possibilities = self._provider.search_for(requirement) locked_requirement = self._locked_requirement_named( self._provider.name_for(requirement) ) # Longwinded way to build a possibilities list with either the locked # requirement or nothing in it. Required, since the API for # locked_requirement isn't guaranteed. locked_possibilities = [ possibility for possibility in all_possibilities if self._provider.is_requirement_satisfied_by( locked_requirement, activated, possibility ) ] return self._group_possibilities(locked_possibilities) def _group_possibilities(self, possibilities): possibility_sets = [] current_possibility_set = None for possibility in reversed(possibilities): dependencies = self._provider.dependencies_for(possibility) if current_possibility_set and current_possibility_set.dependencies == dependencies: current_possibility_set.possibilities.insert(0, possibility) else: possibility_sets.insert( 0, PossibilitySet(dependencies, [possibility]) ) current_possibility_set = possibility_sets[0] return possibility_sets def _handle_missing_or_push_dependency_state(self, state): if ( state.requirement and not state.possibilities and self._provider.allow_missing(state.requirement) ): state.activated.detach_vertex_named(state.name) self._push_state_for_requirements( copy(state.requirements), False, state.activated ) else: self._states.append(state) state.activated.tag(state) PKpVXLݸpoetry/mixology/resolver.pyfrom typing import Any from typing import List from typing import Union from .contracts import SpecificationProvider from .contracts import UI from .dependency_graph import DependencyGraph from .resolution import Resolution class Resolver: def __init__(self, specification_provider: SpecificationProvider, resolver_ui: UI): self._specification_provider = specification_provider self._resolver_ui = resolver_ui @property def specification_provider(self) -> SpecificationProvider: return self._specification_provider @property def ui(self) -> UI: return self._resolver_ui def resolve(self, requested: List[Any], base: Union[DependencyGraph, None] = None) -> DependencyGraph: if base is None: base = DependencyGraph() return Resolution( self._specification_provider, self._resolver_ui, requested, base ).resolve() PK3WZLQpoetry/mixology/state.pyfrom copy import copy from .dependency_graph import DependencyGraph class ResolutionState: def __init__(self, name, requirements, activated, requirement, possibilities, depth, conflicts, unused_unwind_options): self._name = name self._requirements = requirements self._activated = activated self._requirement = requirement self.possibilities = possibilities self._depth = depth self.conflicts = conflicts self.unused_unwind_options = unused_unwind_options @property def name(self): return self._name @property def requirements(self): return self._requirements @property def activated(self): return self._activated @property def requirement(self): return self._requirement @property def depth(self): return self._depth @classmethod def empty(cls): return cls(None, [], DependencyGraph(), None, None, 0, {}, []) def __repr__(self): return f'<{self.__class__.__name__} {self._name} ' \ f'({str(self.requirement)})>' class PossibilityState(ResolutionState): pass class DependencyState(ResolutionState): def pop_possibility_state(self): state = PossibilityState( self._name, copy(self._requirements), self._activated, self._requirement, [self.possibilities.pop() if self.possibilities else None], self._depth + 1, copy(self.conflicts), copy(self.unused_unwind_options) ) state.activated.tag(state) return state PKPZLs !poetry/mixology/unwind_details.pyfrom collections import namedtuple class UnwindDetails: def __init__(self, state_index, state_requirement, requirement_tree, conflicting_requirements, requirement_trees, requirements_unwound_to_instead): self.state_index = state_index self.state_requirement = state_requirement self.requirement_tree = requirement_tree self.conflicting_requirements = conflicting_requirements self.requirement_trees = requirement_trees self.requirements_unwound_to_instead = requirements_unwound_to_instead self._reversed_requirement_tree_index = None self._sub_dependencies_to_avoid = None self._all_requirements = None @property def reversed_requirement_tree_index(self): if self._reversed_requirement_tree_index is None: if self.state_requirement: self._reversed_requirement_tree_index = list(reversed( self.requirement_tree )).index(self.state_requirement) else: self._reversed_requirement_tree_index = 999999 return self._reversed_requirement_tree_index def unwinding_to_primary_requirement(self): return self.requirement_tree[-1] == self.state_requirement @property def sub_dependencies_to_avoid(self): if self._sub_dependencies_to_avoid is None: self._sub_dependencies_to_avoid = [] for tree in self.requirement_trees: try: index = tree.index(self.state_requirement) except ValueError: continue if tree[index + 1] is not None: self._sub_dependencies_to_avoid.append(tree[index + 1]) return self._sub_dependencies_to_avoid @property def all_requirements(self): if self._all_requirements is None: self._all_requirements = [ x for tree in self.requirement_trees for x in tree ] return self._all_requirements def __eq__(self, other): if not isinstance(other, UnwindDetails): return NotImplemented return ( self.state_index == other.state_index and ( self.reversed_requirement_tree_index == other.reversed_requirement_tree_index ) ) def __lt__(self, other): if not isinstance(other, UnwindDetails): return NotImplemented return self.state_index < other.state_index def __le__(self, other): if not isinstance(other, UnwindDetails): return NotImplemented return self.state_index <= other.state_index def __gt__(self, other): if not isinstance(other, UnwindDetails): return NotImplemented return self.state_index > other.state_index def __ge__(self, other): if not isinstance(other, UnwindDetails): return NotImplemented return self.state_index >= other.state_index def __hash__(self): return hash((id(self), self.state_index, self.state_requirement)) PKQXLgծffpoetry/mixology/utils.pydef unique(l): used = set() return [x for x in l if x not in used and (used.add(x) or True)] PKap\L_<poetry/packages/__init__.pyfrom .dependency import Dependency from .locker import Locker from .package import Package from .vcs_dependency import VCSDependency PKrfLyY[poetry/packages/dependency.pyimport poetry.packages from poetry.semver.constraints import Constraint from poetry.semver.constraints import MultiConstraint from poetry.semver.constraints.base_constraint import BaseConstraint from poetry.semver.version_parser import VersionParser class Dependency: def __init__(self, name: str, constraint: str, optional: bool = False, category: str = 'main', allows_prereleases: bool = False): self._name = name.lower() self._pretty_name = name self._parser = VersionParser() try: self._constraint = self._parser.parse_constraints(constraint) except ValueError: self._constraint = self._parser.parse_constraints('*') self._pretty_constraint = constraint self._optional = optional self._category = category self._allows_prereleases = allows_prereleases self._python_versions = '*' self._python_constraint = self._parser.parse_constraints('*') self._platform = '*' self._platform_constraint = self._parser.parse_constraints('*') self._extras = [] @property def name(self): return self._name @property def constraint(self): return self._constraint @property def pretty_constraint(self): return self._pretty_constraint @property def pretty_name(self): return self._pretty_name @property def category(self): return self._category @property def python_versions(self): return self._python_versions @python_versions.setter def python_versions(self, value: str): self._python_versions = value self._python_constraint = self._parser.parse_constraints(value) @property def python_constraint(self): return self._python_constraint @property def platform(self) -> str: return self._platform @platform.setter def platform(self, value: str): self._platform = value @property def platform_constraint(self): return self._platform_constraint @property def extras(self) -> list: return self._extras def allows_prereleases(self): return self._allows_prereleases def is_optional(self): return self._optional def is_vcs(self): return False def accepts(self, package: 'poetry.packages.Package') -> bool: """ Determines if the given package matches this dependency. """ return ( self._name == package.name and self._constraint.matches(Constraint('=', package.version)) and (not package.is_prerelease() or self.allows_prereleases()) ) def to_pep_508(self) -> str: requirement = f'{self.pretty_name}' if isinstance(self.constraint, MultiConstraint): requirement += ' ({})'.format(','.join( [str(c).replace(' ', '') for c in self.constraint.constraints] )) else: requirement += ' ({})'.format(str(self.constraint).replace(' ', '')) # Markers markers = [] # Python marker if self.python_versions != '*': python_constraint = self.python_constraint markers.append(self._create_nested_marker('python_version', python_constraint)) if markers: requirement += f'; {" and ".join(markers)}' return requirement def _create_nested_marker(self, name, constraint): if isinstance(constraint, MultiConstraint): parts = [] for c in constraint.constraints: parts.append(self._create_nested_marker(name, c)) glue = ' and ' if constraint.is_disjunctive(): parts = [f'({part})' for part in parts] glue = ' or ' marker = glue.join(parts) else: marker = f'{name}{constraint.string_operator}"{constraint.version}"' return marker def activate(self): """ Set the dependency as mandatory. """ self._optional = False def __eq__(self, other): if not isinstance(other, Dependency): return NotImplemented return self._name == other.name and self._constraint == other.constraint def __hash__(self): return hash((self._name, self._pretty_constraint)) def __str__(self): return f'{self._pretty_name} ({self._pretty_constraint})' def __repr__(self): return f'' PKfLHHpoetry/packages/locker.pyimport json import poetry.packages from hashlib import sha256 from pathlib import Path from typing import List from poetry.repositories import Repository from poetry.utils.toml_file import TomlFile class Locker: _relevant_keys = [ 'name', 'version', 'python-versions', 'platform', 'dependencies', 'dev-dependencies', 'source', ] def __init__(self, lock: Path, local_config: dict): self._lock = TomlFile(lock) self._local_config = local_config self._lock_data = None self._content_hash = self._get_content_hash() @property def lock(self) -> TomlFile: return self._lock @property def lock_data(self): if self._lock_data is None: self._lock_data = self._get_lock_data() return self._lock_data def is_locked(self) -> bool: """ Checks whether the locker has been locked (lockfile found). """ if not self._lock.exists(): return False return 'package' in self.lock_data def is_fresh(self) -> bool: """ Checks whether the lock file is still up to date with the current hash. """ lock = self._lock.read(True) metadata = lock.get('metadata', {}) if 'content-hash' in metadata: return self._content_hash == lock['metadata']['content-hash'] return False def locked_repository(self, with_dev_reqs: bool = False) -> Repository: """ Searches and returns a repository of locked packages. """ if not self.is_locked(): return Repository() lock_data = self.lock_data packages = Repository() if with_dev_reqs: locked_packages = lock_data['package'] else: locked_packages = [ p for p in lock_data['package'] if p['category'] == 'main' ] if not locked_packages: return packages for info in locked_packages: package = poetry.packages.Package( info['name'], info['version'], info['version'] ) package.description = info.get('description', '') package.category = info['category'] package.optional = info['optional'] package.hashes = lock_data['metadata']['hashes'][info['name']] package.python_versions = info['python-versions'] for dep_name, constraint in info.get('dependencies', {}).items(): package.add_dependency(dep_name, constraint) if 'source' in info: package.source_type = info['source']['type'] package.source_url = info['source']['url'] package.source_reference = info['source']['reference'] packages.add_package(package) return packages def set_lock_data(self, root, packages) -> bool: hashes = {} packages = self._lock_packages(packages) # Retrieving hashes for package in packages: hashes[package['name']] = package['hashes'] del package['hashes'] lock = { 'package': packages, 'metadata': { 'python-versions': root.python_versions, 'platform': root.platform, 'content-hash': self._content_hash, 'hashes': hashes, } } if root.extras: lock['extras'] = { extra: [dep.pretty_name for dep in deps] for extra, deps in root.extras.items() } if not self.is_locked() or lock != self.lock_data: self._write_lock_data(lock) return True return False def _write_lock_data(self, data): self._lock.write(data) self._lock_data = None def _get_content_hash(self) -> str: """ Returns the sha256 hash of the sorted content of the composer file. """ content = self._local_config relevant_content = {} for key in self._relevant_keys: relevant_content[key] = content.get(key) content_hash = sha256( json.dumps(relevant_content, sort_keys=True).encode() ).hexdigest() return content_hash def _get_lock_data(self) -> dict: if not self._lock.exists(): raise RuntimeError( 'No lockfile found. Unable to read locked packages' ) return self._lock.read(True) def _lock_packages(self, packages: List['poetry.packages.Package']) -> list: locked = [] for package in sorted(packages, key=lambda x: x.name): spec = self._dump_package(package) locked.append(spec) return locked def _dump_package(self, package: 'poetry.packages.Package') -> dict: dependencies = {} for dependency in package.requires: if dependency.is_optional(): continue dependencies[dependency.pretty_name] = dependency.pretty_constraint data = { 'name': package.pretty_name, 'version': package.pretty_version, 'description': package.description, 'category': package.category, 'optional': package.optional, 'python-versions': package.python_versions, 'platform': package.platform, 'hashes': package.hashes, 'dependencies': dependencies } if package.source_type: data['source'] = { 'type': package.source_type, 'url': package.source_url, 'reference': package.source_reference } if package.requirements: data['requirements'] = package.requirements return data PKqhL4"u--poetry/packages/package.pyimport re from typing import Union from poetry.semver.constraints import Constraint from poetry.semver.helpers import parse_stability from poetry.semver.version_parser import VersionParser from poetry.version import parse as parse_version from .dependency import Dependency from .vcs_dependency import VCSDependency AUTHOR_REGEX = re.compile('(?u)^(?P[- .,\w\d\'’"()]+) <(?P.+?)>$') class Package: AVAILABLE_PYTHONS = { '2', '2.7', '3', '3.4', '3.5', '3.6', '3.7' } supported_link_types = { 'require': { 'description': 'requires', 'method': 'requires' }, 'provide': { 'description': 'provides', 'method': 'provides' } } STABILITY_STABLE = 0 STABILITY_RC = 5 STABILITY_BETA = 10 STABILITY_ALPHA = 15 STABILITY_DEV = 20 stabilities = { 'stable': STABILITY_STABLE, 'rc': STABILITY_RC, 'beta': STABILITY_BETA, 'alpha': STABILITY_ALPHA, 'dev': STABILITY_DEV, } def __init__(self, name, version, pretty_version=None): """ Creates a new in memory package. """ self._pretty_name = name self._name = name.lower() self._version = str(parse_version(version)) self._pretty_version = pretty_version or version self.description = '' self._stability = parse_stability(version) self._dev = self._stability == 'dev' self._authors = [] self.homepage = None self.repository_url = None self.keywords = [] self.license = None self.readme = '' self.source_type = '' self.source_reference = '' self.source_url = '' self.requires = [] self.dev_requires = [] self.extras = {} self._parser = VersionParser() self.category = 'main' self.hashes = [] self.optional = False # Requirements for making it mandatory self.requirements = {} self._python_versions = '*' self._python_constraint = self._parser.parse_constraints('*') self._platform = '*' self._platform_constraint = self._parser.parse_constraints('*') @property def name(self): return self._name @property def pretty_name(self): return self._pretty_name @property def version(self): return self._version @property def pretty_version(self): return self._pretty_version @property def unique_name(self): return self.name + '-' + self._version @property def pretty_string(self): return self.pretty_name + ' ' + self.pretty_version @property def full_pretty_version(self): if not self._dev and self.source_type not in ['hg', 'git']: return self._pretty_version # if source reference is a sha1 hash -- truncate if len(self.source_reference) == 40: return '{} {}'.format(self._pretty_version, self.source_reference[0:7]) return '{} {}'.format(self._pretty_version, self.source_reference) @property def authors(self) -> list: return self._authors @property def author_name(self) -> str: return self._get_author()['name'] @property def author_email(self) -> str: return self._get_author()['email'] def _get_author(self) -> dict: if not self._authors: return { 'name': None, 'email': None } m = AUTHOR_REGEX.match(self._authors[0]) name = m.group('name') email = m.group('email') return { 'name': name, 'email': email } @property def python_versions(self): return self._python_versions @python_versions.setter def python_versions(self, value: str): self._python_versions = value self._python_constraint = self._parser.parse_constraints(value) @property def python_constraint(self): return self._python_constraint @property def platform(self) -> str: return self._platform @platform.setter def platform(self, value: str): self._platform = value self._platform_constraint = self._parser.parse_constraints(value) @property def platform_constraint(self): return self._platform_constraint @property def classifiers(self): classifiers = [] # Automatically set python classifiers parser = VersionParser() if self.python_versions == '*': python_constraint = parser.parse_constraints('~2.7 || ^3.4') else: python_constraint = self.python_constraint for version in sorted(self.AVAILABLE_PYTHONS): if len(version) == 1: constraint = parser.parse_constraints(version + '.*') else: constraint = Constraint('=', version) if python_constraint.matches(constraint): classifiers.append( f'Programming Language :: Python :: {version}' ) return classifiers def is_dev(self): return self._dev def is_prerelease(self): return self._stability != 'stable' def add_dependency(self, name: str, constraint: Union[str, dict, None] = None, category: str = 'main') -> Dependency: if constraint is None: constraint = '*' if isinstance(constraint, dict): if 'git' in constraint: # VCS dependency optional = constraint.get('optional', False) python_versions = constraint.get('python') platform = constraint.get('platform') dependency = VCSDependency( name, 'git', constraint['git'], branch=constraint.get('branch', None), tag=constraint.get('tag', None), rev=constraint.get('rev', None), optional=optional, ) if python_versions: dependency.python_versions = python_versions if platform: dependency.platform = platform else: version = constraint['version'] optional = constraint.get('optional', False) allows_prereleases = constraint.get('allows_prereleases', False) python_versions = constraint.get('python') platform = constraint.get('platform') dependency = Dependency( name, version, optional=optional, category=category, allows_prereleases=allows_prereleases ) if python_versions: dependency.python_versions = python_versions if platform: dependency.platform = platform if 'extras' in constraint: for extra in constraint['extras']: dependency.extras.append(extra) else: dependency = Dependency(name, constraint, category=category) if category == 'dev': self.dev_requires.append(dependency) else: self.requires.append(dependency) return dependency def __hash__(self): return hash((self._name, self._version)) def __eq__(self, other): if not isinstance(other, Package): return NotImplemented return self._name == other.name and self._version == other.version def __str__(self): return self.unique_name def __repr__(self): return ''.format(self.unique_name) PK/UfLm!poetry/packages/vcs_dependency.pyfrom .dependency import Dependency class VCSDependency(Dependency): """ Represents a VCS dependency """ def __init__(self, name, vcs, source, branch=None, tag=None, rev=None, optional=False): self._vcs = vcs self._source = source if not any([branch, tag, rev]): # If nothing has been specified, we assume master branch = 'master' self._branch = branch self._tag = tag self._rev = rev super().__init__( name, '*', optional=optional, allows_prereleases=True ) @property def vcs(self) -> str: return self._vcs @property def source(self): return self._source @property def branch(self): return self._branch @property def tag(self): return self._tag @property def rev(self): return self._rev @property def reference(self) -> str: return self._branch or self._tag or self._rev @property def pretty_constraint(self) -> str: if self._branch: what = 'branch' version = self._branch elif self._tag: what = 'tag' version = self._tag else: what = 'rev' version = self._rev return f'{what} {version}' def is_vcs(self) -> bool: return True def accepts_prereleases(self): return True PKjgL poetry/poetry.pyfrom pathlib import Path from .__version__ import __version__ from .packages import Dependency from .packages import Locker from .packages import Package from .repositories import Pool from .repositories.pypi_repository import PyPiRepository from .utils.toml_file import TomlFile class Poetry: VERSION = __version__ def __init__(self, file: Path, config: dict, package: Package, locker: Locker): self._file = TomlFile(file) self._package = package self._config = config self._locker = locker # Configure sources self._pool = Pool() for source in self._config.get('source', []): self._pool.configure(source) # Always put PyPI last to prefere private repositories self._pool.add_repository(PyPiRepository()) @property def file(self): return self._file @property def package(self) -> Package: return self._package @property def config(self) -> dict: return self._config @property def locker(self) -> Locker: return self._locker @property def pool(self) -> Pool: return self._pool @classmethod def create(cls, cwd) -> 'Poetry': poetry_file = Path(cwd) / 'pyproject.toml' if not poetry_file.exists(): raise RuntimeError( f'Poetry could not find a pyproject.toml file in {cwd}' ) # TODO: validate file content local_config = TomlFile(poetry_file.as_posix()).read(True) if 'tool' not in local_config or 'poetry' not in local_config['tool']: raise RuntimeError( f'[tool.poetry] section not found in {poetry_file.name}' ) local_config = local_config['tool']['poetry'] # Load package name = local_config['name'] version = local_config['version'] package = Package(name, version, version) for author in local_config['authors']: package.authors.append(author) package.description = local_config.get('description', '') package.homepage = local_config.get('homepage') package.repository_url = local_config.get('repository') package.license = local_config.get('license') package.keywords = local_config.get('keywords', []) if 'readme' in local_config: with open(poetry_file.parent / local_config['readme']) as f: package.readme = f.read() if 'platform' in local_config: package.platform = local_config['platform'] if 'dependencies' in local_config: for name, constraint in local_config['dependencies'].items(): if name.lower() == 'python': package.python_versions = constraint continue package.add_dependency(name, constraint) if 'dev-dependencies' in local_config: for name, constraint in local_config['dev-dependencies'].items(): package.add_dependency(name, constraint, category='dev') if 'extras' in local_config: for extra_name, requirements in local_config['extras'].items(): package.extras[extra_name] = [ Dependency(req, '*') for req in requirements ] locker = Locker(poetry_file.with_suffix('.lock'), local_config) return cls(poetry_file, local_config, package, locker) PKKcZL-;}poetry/puzzle/__init__.pyfrom .solver import Solver PKhZLKpoetry/puzzle/exceptions.pyclass SolverProblemError(Exception): def __init__(self, error): self._error = error super().__init__(str(error)) @property def error(self): return self._error PK}UVL,MYY$poetry/puzzle/operations/__init__.pyfrom .install import Install from .uninstall import Uninstall from .update import Update PKkuVLÏ#poetry/puzzle/operations/install.pyfrom .operation import Operation class Install(Operation): def __init__(self, package, reason: str = None) -> None: super().__init__(reason) self._package = package @property def package(self): return self._package @property def job_type(self): return 'install' def __str__(self) -> str: return 'Installing {} ({})'.format( self.package.pretty_name, self.format_version(self.package) ) def __repr__(self): return ''.format( self.package.pretty_name, self.format_version(self.package) ) PKafL/ %poetry/puzzle/operations/operation.py# -*- coding: utf-8 -*- class Operation: def __init__(self, reason: str = None) -> None: self._reason = reason self._skipped = False self._skip_reason = None @property def job_type(self) -> str: raise NotImplementedError @property def reason(self) -> str: return self._reason @property def skipped(self) -> bool: return self._skipped @property def skip_reason(self): return self._skip_reason def format_version(self, package) -> str: return package.full_pretty_version def skip(self, reason: str) -> None: self._skipped = True self._skip_reason = reason PKiuVLct%poetry/puzzle/operations/uninstall.pyfrom .operation import Operation class Uninstall(Operation): def __init__(self, package, reason=None): super(Uninstall, self).__init__(reason) self._package = package @property def package(self): return self._package @property def job_type(self): return 'uninstall' def __str__(self): return 'Uninstalling {} ({})'.format( self.package.pretty_name, self.format_version(self._package) ) def __repr__(self): return ''.format( self.package.pretty_name, self.format_version(self.package) ) PKfuVLoo"poetry/puzzle/operations/update.pyfrom .operation import Operation class Update(Operation): def __init__(self, initial, target, reason=None): self._initial_package = initial self._target_package = target super(Update, self).__init__(reason) @property def initial_package(self): return self._initial_package @property def target_package(self): return self._target_package @property def job_type(self): return 'update' def __str__(self): return ( 'Updating {} ({}) to {} ({})'.format( self.initial_package.pretty_name, self.format_version(self.initial_package), self.target_package.pretty_name, self.format_version(self.target_package) ) ) def __repr__(self): return ( ''.format( self.initial_package.pretty_name, self.format_version(self.initial_package), self.target_package.pretty_name, self.format_version(self.target_package) ) ) PKgLv!s DDpoetry/puzzle/provider.pyimport os import shutil from functools import cmp_to_key from pathlib import Path from tempfile import mkdtemp from typing import Dict from typing import List from poetry.mixology import DependencyGraph from poetry.mixology.conflict import Conflict from poetry.mixology.contracts import SpecificationProvider from poetry.packages import Dependency from poetry.packages import Package from poetry.packages import VCSDependency from poetry.repositories import Pool from poetry.semver import less_than from poetry.utils.toml_file import TomlFile from poetry.utils.venv import Venv from poetry.vcs.git import Git class Provider(SpecificationProvider): UNSAFE_PACKAGES = {'setuptools', 'distribute', 'pip'} def __init__(self, package: Package, pool: Pool): self._package = package self._pool = pool self._python_constraint = package.python_constraint @property def pool(self) -> Pool: return self._pool @property def name_for_explicit_dependency_source(self) -> str: return 'poetry.toml' @property def name_for_locking_dependency_source(self) -> str: return 'poetry.lock' def name_for(self, dependency: Dependency) -> str: """ Returns the name for the given dependency. """ return dependency.name def search_for(self, dependency: Dependency) -> List[Package]: """ Search for the specifications that match the given dependency. The specifications in the returned list will be considered in reverse order, so the latest version ought to be last. """ if dependency.is_vcs(): return self.search_for_vcs(dependency) packages = self._pool.find_packages( dependency.name, dependency.constraint, extras=dependency.extras, ) packages.sort( key=cmp_to_key( lambda x, y: 0 if x.version == y.version else -1 * int(less_than(x.version, y.version) or -1) ) ) return packages def search_for_vcs(self, dependency: VCSDependency) -> List[Package]: """ Search for the specifications that match the given VCS dependency. Basically, we clone the repository in a temporary directory and get the information we need by checking out the specified reference. """ if dependency.vcs != 'git': raise ValueError(f'Unsupported VCS dependency {dependency.vcs}') tmp_dir = Path(mkdtemp(prefix=f'pypoetry-git-{dependency.name}')) try: git = Git() git.clone(dependency.source, tmp_dir) git.checkout(dependency.reference, tmp_dir) revision = git.rev_parse( dependency.reference, tmp_dir ).strip() if dependency.tag or dependency.rev: revision = dependency.reference poetry = TomlFile(tmp_dir / 'poetry.toml') if poetry.exists(): # If a poetry.toml file exists # We use it to get the information we need info = poetry.read() name = info['package']['name'] version = info['package']['version'] package = Package(name, version, version) for req_name, req_constraint in info['dependencies'].items(): package.add_dependency(req_name, req_constraint) else: # We need to use setup.py here # to figure the information we need # We need to place ourselves in the proper # folder for it to work current_dir = os.getcwd() os.chdir(tmp_dir.as_posix()) try: venv = Venv.create() output = venv.run( 'python', 'setup.py', '--name', '--version' ) output = output.split('\n') name = output[-3] version = output[-2] package = Package(name, version, version) # Figure out a way to get requirements except Exception: raise finally: os.chdir(current_dir) package.source_type = 'git' package.source_url = dependency.source package.source_reference = revision except Exception: raise finally: shutil.rmtree(tmp_dir.as_posix()) return [package] def dependencies_for(self, package: Package): if package.source_type == 'git': # Information should already be set pass else: package = self._pool.package(package.name, package.version) return [ r for r in package.requires if not r.is_optional() and r.name not in self.UNSAFE_PACKAGES ] def is_requirement_satisfied_by(self, requirement: Dependency, activated: DependencyGraph, package: Package) -> bool: """ Determines whether the given requirement is satisfied by the given spec, in the context of the current activated dependency graph. """ if isinstance(requirement, Package): return requirement == package if not requirement.accepts(package): return False if package.is_prerelease() and not requirement.allows_prereleases(): vertex = activated.vertex_named(package.name) if not any([r.allows_prereleases() for r in vertex.requirements]): return False return self._package.python_constraint.matches(package.python_constraint) def sort_dependencies(self, dependencies: List[Dependency], activated: DependencyGraph, conflicts: Dict[str, List[Conflict]]): return sorted(dependencies, key=lambda d: [ 0 if activated.vertex_named(d.name).payload else 1, 0 if d.allows_prereleases() else 1, 0 if d.name in conflicts else 1, 0 if activated.vertex_named(d.name).payload else len(self.search_for(d)) ]) PKMgL#֮poetry/puzzle/solver.pyfrom typing import List from poetry.mixology import Resolver from poetry.mixology.dependency_graph import DependencyGraph from poetry.mixology.exceptions import ResolverError from poetry.semver.version_parser import VersionParser from .exceptions import SolverProblemError from .operations import Install from .operations import Uninstall from .operations import Update from .operations.operation import Operation from .provider import Provider from .ui import UI class Solver: def __init__(self, package, pool, locked, io): self._package = package self._pool = pool self._locked = locked self._io = io def solve(self, requested, fixed=None, extras=None) -> List[Operation]: resolver = Resolver(Provider(self._package, self._pool), UI(self._io)) base = None if fixed is not None: base = DependencyGraph() for fixed_req in fixed: base.add_vertex(fixed_req.name, fixed_req, True) try: graph = resolver.resolve(requested, base=base) except ResolverError as e: raise SolverProblemError(e) packages = [v.payload for v in graph.vertices.values()] # Setting info for vertex in graph.vertices.values(): tags = self._get_tags_for_vertex(vertex, requested) if 'main' in tags['category']: vertex.payload.category = 'main' else: vertex.payload.category = 'dev' if not tags['optional']: vertex.payload.optional = False else: vertex.payload.optional = True # Finding the less restrictive requirements requirements = {} parser = VersionParser() for req_name, reqs in tags['requirements'].items(): for req in reqs: if req_name == 'python': if 'python' not in requirements: requirements['python'] = req continue previous = parser.parse_constraints(requirements['python']) current = parser.parse_constraints(req) if current.matches(previous): requirements['python'] = req if 'platform' in req: if 'platform' not in requirements: requirements['platform'] = req continue vertex.payload.requirements = requirements operations = [] for package in packages: installed = False for pkg in self._locked.packages: if package.name == pkg.name: installed = True # Checking version if package.version != pkg.version: operations.append(Update(pkg, package)) break if not installed: operations.append(Install(package)) # Checking for removals for pkg in self._locked.packages: remove = True for package in packages: if pkg.name == package.name: remove = False break if remove: operations.append(Uninstall(pkg)) return list(reversed(operations)) def _get_tags_for_vertex(self, vertex, requested): tags = { 'category': [], 'optional': True, 'requirements': { 'python': [], 'platform': [] } } if not vertex.incoming_edges: # Original dependency for req in requested: if req.name == vertex.name: tags['category'].append(req.category) if not req.is_optional(): tags['optional'] = False if req.python_versions != '*': tags['requirements']['python'].append(str(req.python_constraint)) if req.platform != '*': tags['requirements']['platform'].append(str(req.platform_constraint)) break else: for edge in vertex.incoming_edges: sub_tags = self._get_tags_for_vertex(edge.origin, requested) tags['category'] += sub_tags['category'] tags['optional'] = tags['optional'] and sub_tags['optional'] requirements = sub_tags['requirements'] tags['requirements']['python'] += requirements.get('python', []) tags['requirements']['platform'] += requirements.get('platform', []) return tags PKhYLpoetry/puzzle/ui.pyfrom cleo.styles import CleoStyle from cleo.helpers import ProgressIndicator from poetry.mixology.contracts import UI as BaseUI class UI(BaseUI): def __init__(self, io: CleoStyle): self._io = io self._progress = None super().__init__(self._io.is_debug()) @property def output(self): return self._io def before_resolution(self) -> None: self._io.write('Resolving dependencies') if self.is_debugging(): self._io.new_line() def indicate_progress(self): if not self.is_debugging(): self._io.write('.') def after_resolution(self) -> None: self._io.new_line() def debug(self, message, depth) -> None: if self.is_debugging(): debug_info = str(message) debug_info = '\n'.join([ ':{}: {}'.format(str(depth).rjust(4), s) for s in debug_info.split('\n') ]) + '\n' self.output.write(debug_info) PK\Ly::poetry/repositories/__init__.pyfrom .pool import Pool from .repository import Repository PKzfL } g&poetry/repositories/base_repository.pyclass BaseRepository: SEARCH_FULLTEXT = 0 SEARCH_NAME = 1 def __init__(self): self._packages = [] @property def packages(self): return self._packages def has_package(self, package): raise NotImplementedError() def package(self, name, version): raise NotImplementedError() def find_packages(self, name, constraint=None, extras=None): raise NotImplementedError() def search(self, query, mode=SEARCH_FULLTEXT): raise NotImplementedError() PKQ^[LHgg+poetry/repositories/installed_repository.pyfrom poetry.packages import Package from poetry.utils.venv import Venv from .repository import Repository class InstalledRepository(Repository): @classmethod def load(cls, venv: Venv) -> 'InstalledRepository': """ Load installed packages. For now, it uses the pip "freeze" command. """ repo = cls() freeze_output = venv.run('pip', 'freeze') for line in freeze_output.split('\n'): if '==' in line: name, version = line.split('==') repo.add_package(Package(name, version, version)) return repo PK{fLr:Q22(poetry/repositories/legacy_repository.pyimport re from pathlib import Path from piptools.cache import DependencyCache from piptools.repositories import PyPIRepository from piptools.resolver import Resolver from piptools.scripts.compile import get_pip_command from pip.req import InstallRequirement from pip.exceptions import InstallationError from cachy import CacheManager import poetry.packages from poetry.locations import CACHE_DIR from poetry.semver.constraints import Constraint from poetry.semver.constraints.base_constraint import BaseConstraint from poetry.semver.version_parser import VersionParser from .pypi_repository import PyPiRepository class LegacyRepository(PyPiRepository): def __init__(self, name, url): if name == 'pypi': raise ValueError('The name [pypi] is reserved for repositories') self._name = name self._url = url command = get_pip_command() opts, _ = command.parse_args([]) self._session = command._build_session(opts) self._repository = PyPIRepository(opts, self._session) self._cache_dir = Path(CACHE_DIR) / 'cache' / 'repositories' / name self._cache = CacheManager({ 'default': 'releases', 'serializer': 'json', 'stores': { 'releases': { 'driver': 'file', 'path': Path(CACHE_DIR) / 'cache' / 'repositories' / name }, 'packages': { 'driver': 'dict' }, 'matches': { 'driver': 'dict' } } }) def find_packages(self, name, constraint=None, extras=None): packages = [] if constraint is not None and not isinstance(constraint, BaseConstraint): version_parser = VersionParser() constraint = version_parser.parse_constraints(constraint) key = name if constraint: key = f'{key}:{str(constraint)}' if self._cache.store('matches').has(key): versions = self._cache.store('matches').get(key) else: candidates = [str(c.version) for c in self._repository.find_all_candidates(name)] versions = [] for version in candidates: if version in versions: continue if ( not constraint or (constraint and constraint.matches(Constraint('=', version))) ): versions.append(version) self._cache.store('matches').put(key, versions, 5) for version in versions: packages.append(self.package(name, version, extras=extras)) return packages def package(self, name, version, extras=None) -> 'poetry.packages.Package': """ Retrieve the release information. This is a heavy task which takes time. We have to download a package to get the dependencies. We also need to download every file matching this release to get the various hashes. Note that, this will be cached so the subsequent operations should be much faster. """ try: index = self._packages.index( poetry.packages.Package(name, version, version) ) return self._packages[index] except ValueError: if extras is None: extras = [] release_info = self.get_release_info(name, version) package = poetry.packages.Package(name, version, version) for req in release_info['requires_dist']: req = InstallRequirement.from_line(req) name = req.name version = str(req.req.specifier) dependency = Dependency( name, version, optional=req.markers ) is_extra = False if req.markers: # Setting extra dependencies and requirements requirements = self._convert_markers( req.markers._markers ) if 'python_version' in requirements: ors = [] for or_ in requirements['python_version']: ands = [] for op, version in or_: ands.append(f'{op}{version}') ors.append(' '.join(ands)) dependency.python_versions = ' || '.join(ors) if 'sys_platform' in requirements: ors = [] for or_ in requirements['sys_platform']: ands = [] for op, platform in or_: ands.append(f'{op}{platform}') ors.append(' '.join(ands)) dependency.platform = ' || '.join(ors) if 'extra' in requirements: is_extra = True for _extras in requirements['extra']: for _, extra in _extras: if extra not in package.extras: package.extras[extra] = [] package.extras[extra].append(dependency) if not is_extra: package.requires.append(dependency) # Adding description package.description = release_info.get('summary', '') # Adding hashes information package.hashes = release_info['digests'] # Activate extra dependencies for extra in extras: if extra in package.extras: for dep in package.extras[extra]: dep.activate() package.requires += package.extras[extra] self._packages.append(package) return package def get_release_info(self, name: str, version: str) -> dict: """ Return the release information given a package name and a version. The information is returned from the cache if it exists or retrieved from the remote server. """ return self._cache.store('releases').remember_forever( f'{name}:{version}', lambda: self._get_release_info(name, version) ) def _get_release_info(self, name: str, version: str) -> dict: ireq = InstallRequirement.from_line(f'{name}=={version}') resolver = Resolver( [ireq], self._repository, cache=DependencyCache(self._cache_dir.as_posix()) ) try: requirements = list(resolver._iter_dependencies(ireq)) except InstallationError as e: # setup.py egg-info error most likely # So we assume no dependencies requirements = [] requires = [] for dep in requirements: constraint = str(dep.req.specifier) require = f'{dep.name}' if constraint: require += f' ({constraint})' requires.append(require) hashes = resolver.resolve_hashes([ireq])[ireq] hashes = [h.split(':')[1] for h in hashes] data = { 'name': name, 'version': version, 'summary': '', 'requires_dist': requires, 'digests': hashes } resolver.repository.freshen_build_caches() return data PK#|fLBJ J poetry/repositories/pool.pyfrom typing import List from typing import Union import poetry.packages from .base_repository import BaseRepository from .repository import Repository class Pool(BaseRepository): def __init__(self, repositories: Union[list, None] = None): if repositories is None: repositories = [] self._repositories = [] for repository in repositories: self.add_repository(repository) super().__init__() @property def repositories(self) -> List[Repository]: return self._repositories def add_repository(self, repository: Repository) -> 'Pool': """ Adds a repository to the pool. """ self._repositories.append(repository) return self def configure(self, source: dict) -> 'Pool': """ Configures a repository based on a source specification and add it to the pool. """ from .legacy_repository import LegacyRepository if 'url' in source: # PyPI-like repository if 'name' not in source: raise RuntimeError('Missing [name] in source.') repository = LegacyRepository(source['name'], source['url']) else: raise RuntimeError('Unsupported source specified') return self.add_repository(repository) def has_package(self, package): raise NotImplementedError() def package(self, name, version) -> Union['poetry.packages.Package', None]: package = poetry.packages.Package(name, version, version) if package in self._packages: return self._packages[self._packages.index(package)] for repository in self._repositories: package = repository.package(name, version) if package: self._packages.append(package) return package return None def find_packages(self, name, constraint=None, extras=None) -> List['poetry.packages.Package']: for repository in self._repositories: packages = repository.find_packages(name, constraint, extras=extras) if packages: return packages return [] def search(self, query, mode=BaseRepository.SEARCH_FULLTEXT): raise NotImplementedError() PKgLV;x$x$&poetry/repositories/pypi_repository.pyfrom pathlib import Path from pip.req import InstallRequirement from typing import List from typing import Union from cachy import CacheManager from requests import get from poetry.locations import CACHE_DIR from poetry.packages import Dependency from poetry.packages import Package from poetry.semver.constraints import Constraint from poetry.semver.constraints.base_constraint import BaseConstraint from poetry.semver.version_parser import VersionParser from .repository import Repository class PyPiRepository(Repository): def __init__(self, url='https://pypi.org/', disable_cache=False): self._url = url self._disable_cache = disable_cache self._cache = CacheManager({ 'default': 'releases', 'serializer': 'json', 'stores': { 'releases': { 'driver': 'file', 'path': Path(CACHE_DIR) / 'cache' / 'repositories' / 'pypi' }, 'packages': { 'driver': 'dict' } } }) super().__init__() def find_packages(self, name: str, constraint: Union[Constraint, str, None] = None, extras: Union[list, None] = None ) -> List[Package]: """ Find packages on the remote server. """ packages = [] if constraint is not None and not isinstance(constraint, BaseConstraint): version_parser = VersionParser() constraint = version_parser.parse_constraints(constraint) info = self.get_package_info(name) versions = [] for version, release in info['releases'].items(): if ( not constraint or (constraint and constraint.matches(Constraint('=', version))) ): versions.append(version) for version in versions: packages.append( self.package(name, version, extras=extras) ) return packages def package(self, name: str, version: str, extras: Union[list, None] = None) -> Package: try: index = self._packages.index(Package(name, version, version)) return self._packages[index] except ValueError: if extras is None: extras = [] release_info = self.get_release_info(name, version) package = Package(name, version, version) for req in release_info['requires_dist']: try: req = InstallRequirement.from_line(req) except Exception: # Probably an invalid marker # We strip the markers hoping for the best req = req.split(';')[0] req = InstallRequirement.from_line(req) name = req.name version = str(req.req.specifier) dependency = Dependency( name, version, optional=req.markers ) is_extra = False if req.markers: # Setting extra dependencies and requirements requirements = self._convert_markers( req.markers._markers ) if 'python_version' in requirements: ors = [] for or_ in requirements['python_version']: ands = [] for op, version in or_: ands.append(f'{op}{version}') ors.append(' '.join(ands)) dependency.python_versions = ' || '.join(ors) if 'sys_platform' in requirements: ors = [] for or_ in requirements['sys_platform']: ands = [] for op, platform in or_: ands.append(f'{op}{platform}') ors.append(' '.join(ands)) dependency.platform = ' || '.join(ors) if 'extra' in requirements: is_extra = True for _extras in requirements['extra']: for _, extra in _extras: if extra not in package.extras: package.extras[extra] = [] package.extras[extra].append(dependency) if not is_extra: package.requires.append(dependency) # Adding description package.description = release_info.get('summary', '') # Adding hashes information package.hashes = release_info['digests'] # Activate extra dependencies for extra in extras: if extra in package.extras: for dep in package.extras[extra]: dep.activate() package.requires += package.extras[extra] self._packages.append(package) return package def search(self, query, mode=0): results = [] search = { 'name': query } if mode == self.SEARCH_FULLTEXT: search['summary'] = query client = ServerProxy(self._url) hits = client.search(search, 'or') for hit in hits: results.append({ 'name': hit['name'], 'description': hit['summary'], 'version': hit['version'] }) return results def get_package_info(self, name: str) -> dict: """ Return the package information given its name. The information is returned from the cache if it exists or retrieved from the remote server. """ if self._disable_cache: return self._get_package_info(name) return self._cache.store('packages').remember_forever( f'{name}', lambda: self._get_package_info(name) ) def _get_package_info(self, name: str) -> dict: data = self._get(self._url + f'pypi/{name}/json') if data is None: raise ValueError(f'Package [{name}] not found.') return data def get_release_info(self, name: str, version: str) -> dict: """ Return the release information given a package name and a version. The information is returned from the cache if it exists or retrieved from the remote server. """ if self._disable_cache: return self._get_release_info(name, version) return self._cache.remember_forever( f'{name}:{version}', lambda: self._get_release_info(name, version) ) def _get_release_info(self, name: str, version: str) -> dict: json_data = self._get(self._url + f'pypi/{name}/{version}/json') if json_data is None: raise ValueError(f'Package [{name}] not found.') info = json_data['info'] data = { 'name': info['name'], 'version': info['version'], 'summary': info['summary'], 'platform': info['platform'], 'requires_dist': info['requires_dist'], 'requires_python': info['requires_python'], 'digests': [] } for file_info in json_data['releases'][version]: data['digests'].append(file_info['digests']['sha256']) return data def _get(self, url: str) -> Union[dict, None]: json_response = get(url) if json_response.status_code == 404: return None json_data = json_response.json() return json_data def _group_markers(self, markers): groups = [[]] for marker in markers: assert isinstance(marker, (list, tuple, str)) if isinstance(marker, list): groups[-1].append(self._group_markers(marker)) elif isinstance(marker, tuple): lhs, op, rhs = marker groups[-1].append((lhs.value, op, rhs.value)) else: assert marker in ["and", "or"] if marker == "or": groups.append([]) return groups def _convert_markers(self, markers): groups = self._group_markers(markers)[0] requirements = {} def _group(_groups, or_=False): nonlocal requirements for group in _groups: if isinstance(group, tuple): variable, op, value = group group_name = str(variable) if group_name not in requirements: requirements[group_name] = [[]] elif or_: requirements[group_name].append([]) requirements[group_name][-1].append((str(op), str(value))) else: _group(group, or_=True) _group(groups) return requirements PKgLoҊ !poetry/repositories/repository.pyimport re from poetry.semver.constraints import Constraint from poetry.semver.constraints.base_constraint import BaseConstraint from poetry.semver.version_parser import VersionParser from poetry.version import parse as parse_version from .base_repository import BaseRepository class Repository(BaseRepository): def __init__(self, packages=None): super(Repository, self).__init__() if packages is None: packages = [] for package in packages: self.add_package(package) def package(self, name, version): name = name.lower() version = str(parse_version(version)) for package in self.packages: if name == package.name and package.version == version: return package def find_packages(self, name, constraint=None, extras=None): name = name.lower() packages = [] if extras is None: extras = [] if not isinstance(constraint, BaseConstraint): parser = VersionParser() constraint = parser.parse_constraints(constraint) for package in self.packages: if name == package.name: pkg_constraint = Constraint('==', package.version) if constraint is None or constraint.matches(pkg_constraint): for extra in extras: if extra in package.extras: for dep in package.extras[extra]: dep.activate() package.requires += package.extras[extra] packages.append(package) return packages def search(self, query, mode=0): regex = '(?i)(?:{})'.format('|'.join(re.split('\s+', query))) matches = {} for package in self.packages: name = package.name if name in matches: continue if ( re.match(regex, name) is not None or ( mode == self.SEARCH_FULLTEXT and isinstance(package, CompletePackage) and re.match(regex, '') ) ): matches[name] = { 'name': package.pretty_name, 'description': (package.description if isinstance(package, CompletePackage) else '') } return list(matches.values()) def has_package(self, package): package_id = package.unique_name for repo_package in self.packages: if package_id == repo_package.unique_name: return True return False def add_package(self, package): self._packages.append(package) def remove_package(self, package): package_id = package.unique_name index = None for i, repo_package in enumerate(self.packages): if package_id == repo_package.unique_name: index = i break if index is not None: del self._packages[index] def __len__(self): return len(self._packages) PKʄTL8ESSpoetry/semver/__init__.pyfrom functools import cmp_to_key from .comparison import less_than from .constraints import Constraint from .helpers import normalize_version from .version_parser import VersionParser SORT_ASC = 1 SORT_DESC = -1 _parser = VersionParser() def statisfies(version, constraints): """ Determine if given version satisfies given constraints. :type version: str :type constraints: str :rtype: bool """ provider = Constraint('==', normalize_version(version)) constraints = _parser.parse_constraints(constraints) return constraints.matches(provider) def satisfied_by(versions, constraints): """ Return all versions that satisfy given constraints. :type versions: List[str] :type constraints: str :rtype: List[str] """ return [version for version in versions if statisfies(version, constraints)] def sort(versions): return _sort(versions, SORT_ASC) def rsort(versions): return _sort(versions, SORT_DESC) def _sort(versions, direction): normalized = [ (i, normalize_version(version)) for i, version in enumerate(versions) ] normalized.sort( key=cmp_to_key( lambda x, y: 0 if x[1] == y[1] else -direction * int(less_than(x[1], y[1]) or -1) ) ) return [versions[i] for i, _ in normalized] PK TLgnpoetry/semver/comparison.pyfrom .constraints.constraint import Constraint def greater_than(version1, version2): """ Evaluates the expression: version1 > version2. :type version1: str :type version2: str :rtype: bool """ return compare(version1, '>', version2) def greater_than_or_equal(version1, version2): """ Evaluates the expression: version1 >= version2. :type version1: str :type version2: str :rtype: bool """ return compare(version1, '>=', version2) def less_than(version1, version2): """ Evaluates the expression: version1 < version2. :type version1: str :type version2: str :rtype: bool """ return compare(version1, '<', version2) def less_than_or_equal(version1, version2): """ Evaluates the expression: version1 <= version2. :type version1: str :type version2: str :rtype: bool """ return compare(version1, '<=', version2) def equal(version1, version2): """ Evaluates the expression: version1 == version2. :type version1: str :type version2: str :rtype: bool """ return compare(version1, '==', version2) def not_equal(version1, version2): """ Evaluates the expression: version1 != version2. :type version1: str :type version2: str :rtype: bool """ return compare(version1, '!=', version2) def compare(version1, operator, version2): """ Evaluates the expression: $version1 $operator $version2 :type version1: str :type operator: str :type version2: str :rtype: bool """ constraint = Constraint(operator, version2) return constraint.matches(Constraint('==', version1)) PKyTL䢬0%poetry/semver/constraints/__init__.pyfrom .constraint import Constraint from .empty_constraint import EmptyConstraint from .multi_constraint import MultiConstraint PK1YVLGLτ\\,poetry/semver/constraints/base_constraint.pyclass BaseConstraint: def matches(self, provider): raise NotImplementedError() PK(gL 8'poetry/semver/constraints/constraint.pyimport operator from poetry.version import parse as parse_version from poetry.version import version_compare from ..helpers import normalize_version from .base_constraint import BaseConstraint class Constraint(BaseConstraint): OP_EQ = operator.eq OP_LT = operator.lt OP_LE = operator.le OP_GT = operator.gt OP_GE = operator.ge OP_NE = operator.ne _trans_op_str = { '=': OP_EQ, '==': OP_EQ, '<': OP_LT, '<=': OP_LE, '>': OP_GT, '>=': OP_GE, '!=': OP_NE } _trans_op_int = { OP_EQ: '==', OP_LT: '<', OP_LE: '<=', OP_GT: '>', OP_GE: '>=', OP_NE: '!=' } def __init__(self, operator: str, version: str): if operator not in self._trans_op_str: raise ValueError( f'Invalid operator "{operator}" given, ' f'expected one of: {", ".join(self.supported_operators)}' ) self._operator = self._trans_op_str[operator] self._string_operator = operator self._version = str(parse_version(version)) @property def supported_operators(self) -> list: return list(self._trans_op_str.keys()) @property def operator(self): return self._operator @property def string_operator(self): return self._string_operator @property def version(self) -> str: return self._version def matches(self, provider): if isinstance(provider, self.__class__): return self.match_specific(provider) # turn matching around to find a match return provider.matches(self) def version_compare(self, a: str, b: str, operator: str) -> bool: if operator not in self._trans_op_str: raise ValueError( f'Invalid operator "{operator}" given, ' f'expected one of: {", ".join(self.supported_operators)}' ) # If we can't normalize the version # we delegate to parse_version() try: a = normalize_version(a) except ValueError: pass try: b = normalize_version(b) except ValueError: pass return version_compare(a, b, operator) def match_specific(self, provider: 'Constraint') -> bool: no_equal_op = self._trans_op_int[self._operator].replace('=', '') provider_no_equal_op = self._trans_op_int[provider.operator].replace('=', '') is_equal_op = self.OP_EQ is self._operator is_non_equal_op = self.OP_NE is self._operator is_provider_equal_op = self.OP_EQ is provider.operator is_provider_non_equal_op = self.OP_NE is provider.operator # '!=' operator is match when other operator # is not '==' operator or version is not match # these kinds of comparisons always have a solution if is_non_equal_op or is_provider_non_equal_op: return (not is_equal_op and not is_provider_equal_op or self.version_compare(provider.version, self._version, '!=')) # An example for the condition is <= 2.0 & < 1.0 # These kinds of comparisons always have a solution if (self._operator is not self.OP_EQ and no_equal_op == provider_no_equal_op): return True if self.version_compare( provider.version, self.version, self._trans_op_int[self._operator] ): # special case, e.g. require >= 1.0 and provide < 1.0 # 1.0 >= 1.0 but 1.0 is outside of the provided interval if ( provider.version == self.version and self._trans_op_int[provider.operator] == provider_no_equal_op and self._trans_op_int[self.operator] != no_equal_op ): return False return True return False def __str__(self): return '{} {}'.format( self._trans_op_int[self._operator], self._version ) def __repr__(self): return ''.format(str(self)) PKdcL"-poetry/semver/constraints/empty_constraint.pyfrom .base_constraint import BaseConstraint class EmptyConstraint(BaseConstraint): pretty_string = None def matches(self, _): return True def __str__(self): return '*' PK+gL$pc-poetry/semver/constraints/multi_constraint.pyfrom .base_constraint import BaseConstraint class MultiConstraint(BaseConstraint): def __init__(self, constraints, conjunctive=True): self._constraints = tuple(constraints) self._conjunctive = conjunctive @property def constraints(self): return self._constraints def is_conjunctive(self): return self._conjunctive def is_disjunctive(self): return not self._conjunctive def matches(self, provider): if self.is_disjunctive(): for constraint in self._constraints: if constraint.matches(provider): return True return False for constraint in self._constraints: if not constraint.matches(provider): return False return True def __str__(self): constraints = [] for constraint in self._constraints: constraints.append(str(constraint)) return '{}'.format( (' ' if self._conjunctive else ' || ').join(constraints) ) PKYLx?+1F F poetry/semver/helpers.pyimport re _modifier_regex = ( '[._-]?' '(?:(stable|beta|b|RC|c|pre|alpha|a|patch|pl|p|post|[a-z])' '((?:[.-]?\d+)*)?)?' '([.-]?dev)?' ) def normalize_version(version): """ Normalizes a version string to be able to perform comparisons on it. """ version = version.strip() # strip off build metadata m = re.match('^([^,\s+]+)\+[^\s]+$', version) if m: version = m.group(1) index = None # Match classic versioning m = re.match( '(?i)^v?(\d{{1,5}})(\.\d+)?(\.\d+)?(\.\d+)?{}$'.format( _modifier_regex ), version ) if m: version = f'{m.group(1)}' \ f'{m.group(2) if m.group(2) else ".0"}' \ f'{m.group(3) if m.group(3) else ".0"}' \ f'{m.group(4) if m.group(4) else ".0"}' index = 5 else: # Some versions have the form M.m.p-\d+ # which means M.m.p-post\d+ m = re.match( '(?i)^v?(\d{{1,5}})(\.\d+)?(\.\d+)?(\.\d+)?-(?:\d+){}$'.format( _modifier_regex ), version ) if m: version = f'{m.group(1)}' \ f'{m.group(2) if m.group(2) else ".0"}' \ f'{m.group(3) if m.group(3) else ".0"}' \ f'{m.group(4) if m.group(4) else ".0"}' index = 5 else: # Match date(time) based versioning m = re.match( '(?i)^v?(\d{{4}}(?:[.:-]?\d{{2}}){{1,6}}(?:[.:-]?\d{{1,3}})?){}$'.format( _modifier_regex ), version ) if m: version = re.sub('\D', '.', m.group(1)) index = 2 # add version modifiers if a version was matched if index is not None: if len(m.groups()) - 1 >= index and m.group(index): version = f'{version}' \ f'-{_expand_stability(m.group(index))}' if m.group(index + 1): version = f'{version}.{m.group(index + 1).lstrip(".-")}' return version raise ValueError(f'Invalid version string "{version}"') def normalize_stability(stability: str) -> str: stability = stability.lower() if stability == 'rc': return 'RC' return stability def parse_stability(version: str) -> str: """ Returns the stability of a version. """ version = re.sub('(?i)#.+$', '', version) if 'dev-' == version[:4] or '-dev' == version[-4:]: return 'dev' m = re.search('(?i){}(?:\+.*)?$'.format(_modifier_regex), version.lower()) if m: if m.group(3): return 'dev' if m.group(1): if m.group(1) in ['beta', 'b']: return 'beta' elif m.group(1) in ['alpha', 'a']: return 'alpha' elif m.group(1) in ['rc', 'c']: return 'RC' else: return 'dev' return 'stable' def _expand_stability(stability: str) -> str: stability = stability.lower() if stability == 'a': return 'alpha' elif stability == 'b': return 'beta' elif stability in ['c', 'pre']: return 'rc' elif stability in ['p', 'pl']: return 'patch' elif stability in ['post']: return '' return stability PKgL) poetry/semver/version_parser.pyimport re from .constraints.constraint import Constraint from .constraints.empty_constraint import EmptyConstraint from .constraints.multi_constraint import MultiConstraint from .helpers import normalize_version, _expand_stability class VersionParser: _modifier_regex = ( '[._-]?' '(?:(stable|beta|b|RC|alpha|a|patch|pl|p)((?:[.-]?\d+)*)?)?' '([.-]?dev)?' ) _stabilities = [ 'stable', 'RC', 'beta', 'alpha', 'dev' ] def parse_constraints(self, constraints: str): """ Parses a constraint string into MultiConstraint and/or Constraint objects. """ pretty_constraint = constraints m = re.match( '(?i)([^,\s]*?)@({})$'.format('|'.join(self._stabilities)), constraints ) if m: constraints = m.group(1) if not constraints: constraints = '*' or_constraints = re.split('\s*\|\|?\s*', constraints.strip()) or_groups = [] for constraints in or_constraints: and_constraints = re.split( '(?< ,]) *(? 1: constraint_objects = [] for constraint in and_constraints: for parsed_constraint in self._parse_constraint(constraint): constraint_objects.append(parsed_constraint) else: constraint_objects = self._parse_constraint(and_constraints[0]) if len(constraint_objects) == 1: constraint = constraint_objects[0] else: constraint = MultiConstraint(constraint_objects) or_groups.append(constraint) if len(or_groups) == 1: constraint = or_groups[0] elif len(or_groups) == 2: # parse the two OR groups and if they are contiguous we collapse # them into one constraint a = str(or_groups[0]) b = str(or_groups[1]) pos_a = a.find('<', 4) pos_b = a.find('<', 4) if ( isinstance(or_groups[0], MultiConstraint) and isinstance(or_groups[1], MultiConstraint) and len(or_groups[0].constraints) and len(or_groups[1].constraints) and a[:3] == '>=' and pos_a != -1 and b[:3] == '>=' and pos_b != -1 and a[pos_a + 2:-1] == b[4:pos_b - 5] ): constraint = MultiConstraint( Constraint('>=', a[4:pos_a - 5]), Constraint('<', b[pos_b + 2:-1]) ) else: constraint = MultiConstraint(or_groups, False) else: constraint = MultiConstraint(or_groups, False) constraint.pretty_string = pretty_constraint return constraint def _parse_constraint(self, constraint): m = re.match('(?i)^v?[xX*](\.[xX*])*$', constraint) if m: return EmptyConstraint(), version_regex = ( 'v?(\d+)(?:\.(\d+))?(?:\.(\d+))?(?:\.(\d+))?{}(?:\+[^\s]+)?' ).format(self._modifier_regex) # Tilde range # # Like wildcard constraints, unsuffixed tilde constraints # say that they must be greater than the previous version, # to ensure that unstable instances of the current version are allowed. # However, if a stability suffix is added to the constraint, # then a >= match on the current version is used instead. m = re.match('(?i)^~{}$'.format(version_regex), constraint) if m: # Work out which position in the version we are operating at if m.group(4): position = 3 elif m.group(3): position = 2 elif m.group(2): position = 2 else: position = 0 # Calculate the stability suffix stability_suffix = '' if m.group(5): stability_suffix += '-{}{}'.format( _expand_stability(m.group(5)), '.' + m.group(6) if m.group(6) else '' ) low_version = self._manipulate_version_string( m.groups(), position, 0 ) + stability_suffix lower_bound = Constraint('>=', low_version) # For upper bound, # we increment the position of one more significance, # but high_position = 0 would be illegal high_position = max(0, position - 1) high_version = self._manipulate_version_string( m.groups(), high_position, 1 ) upper_bound = Constraint('<', high_version) return lower_bound, upper_bound # Caret range # # Allows changes that do not modify # the left-most non-zero digit in the [major, minor, patch] tuple. # In other words, this allows: # - patch and minor updates for versions 1.0.0 and above, # - patch updates for versions 0.X >=0.1.0, # - and no updates for versions 0.0.X m = re.match('^\^{}($)'.format(version_regex), constraint) if m: if m.group(1) != '0' or not m.group(2): position = 0 elif m.group(2) != '0' or not m.group(3): position = 1 else: position = 2 low_version = normalize_version(constraint[1:]) lower_bound = Constraint('>=', low_version) # For upper bound, # we increment the position of one more significance, # but high_position = 0 would be illegal high_version = self._manipulate_version_string( m.groups(), position, 1 ) upper_bound = Constraint('<', high_version) return lower_bound, upper_bound # X range # # Any of X, x, or * may be used to "stand in" # for one of the numeric values in the [major, minor, patch] tuple. # A partial version range is treated as an X-Range, # so the special character is in fact optional. m = re.match( '^v?(\d+)(?:\.(\d+))?(?:\.(\d+))?(?:\.[xX*])+$', constraint ) if m: if m.group(3): position = 2 elif m.group(2): position = 1 else: position = 0 low_version = self._manipulate_version_string( m.groups(), position ) high_version = self._manipulate_version_string( m.groups(), position, 1 ) if low_version == '0.0.0.0': return Constraint('<', high_version), return Constraint('>=', low_version), Constraint('<', high_version) # Basic Comparators m = re.match('^(<>|!=|>=?|<=?|==?)?\s*(.*)', constraint) if m: try: version = normalize_version(m.group(2)) return Constraint(m.group(1) or '=', version), except ValueError: pass raise ValueError( 'Could not parse version constraint: {}'.format(constraint) ) def _manipulate_version_string(self, matches, position, increment=0, pad='0'): """ Increment, decrement, or simply pad a version number. """ matches = [matches[i] if i < len(matches) - 1 and matches[i] is not None else '0' for i in range(4)] for i in range(3, -1, -1): if i > position: matches[i] = pad elif i == position and increment: matches[i] = int(matches[i]) + increment # If $matches[i] was 0, carry the decrement if matches[i] < 0: matches[i] = pad position -= 1 # Return null on a carry overflow if i == 1: return return '{}.{}.{}.{}'.format(matches[0], matches[1], matches[2], matches[3]) PKdLHWpoetry/toml/__init__.py""" This toml module is a port with changes and fixes of [contoml](https://github.com/jumpscale7/python-consistent-toml). """ from .toml_file import TOMLFile from .prettify.lexer import tokenize as lexer from .prettify.parser import parse_tokens def loads(text): """ Parses TOML text into a dict-like object and returns it. """ tokens = tuple(lexer(text, is_top_level=True)) elements = parse_tokens(tokens) return TOMLFile(elements) def load(file_path): """ Parses a TOML file into a dict-like object and returns it. """ with open(file_path) as fd: return loads(fd.read()) def dumps(value): """ Dumps a data structure to TOML source code. The given value must be either a dict of dict values, a dict, or a TOML file constructed by this module. """ if not isinstance(value, TOMLFile): raise RuntimeError( 'Can only dump a TOMLFile instance loaded by load() or loads()' ) return value.dumps() def dump(obj, file_path, prettify=False): """ Dumps a data structure to the filesystem as TOML. The given value must be either a dict of dict values, a dict, or a TOML file constructed by this module. """ with open(file_path, 'w') as fp: fp.write(dumps(obj)) PKbL)œpoetry/toml/array.pyfrom .prettify.errors import InvalidValueError from .freshtable import FreshTable from .prettify import util class ArrayOfTables(list): def __init__(self, toml_file, name, iterable=None): if iterable: list.__init__(self, iterable) self._name = name self._toml_file = toml_file def append(self, value): if isinstance(value, dict): table = FreshTable(parent=self, name=self._name, is_array=True) table._append_to_parent() index = len(self._toml_file[self._name]) - 1 for key_seq, value in util.flatten_nested(value).items(): # self._toml_file._setitem_with_key_seq((self._name, index) + key_seq, value) self._toml_file._array_setitem_with_key_seq(self._name, index, key_seq, value) # for k, v in value.items(): # table[k] = v else: raise InvalidValueError('Can only append a dict to an array of tables') def __getitem__(self, item): try: return list.__getitem__(self, item) except IndexError: if item == len(self): return FreshTable(parent=self, name=self._name, is_array=True) else: raise def append_fresh_table(self, fresh_table): list.append(self, fresh_table) if self._toml_file: self._toml_file.append_fresh_table(fresh_table) PKidLozggpoetry/toml/cascadedict.pyimport operator from functools import reduce from . import raw class CascadeDict: """ A dict-like object made up of one or more other dict-like objects where querying for an item cascade-gets it from all the internal dicts in order of their listing, and setting an item sets it on the first dict listed. """ def __init__(self, *internal_dicts): assert internal_dicts, 'internal_dicts cannot be empty' self._internal_dicts = tuple(internal_dicts) def cascaded_with(self, one_more_dict): """ Returns another instance with one more dict cascaded at the end. """ return CascadeDict(*self._internal_dicts, one_more_dict) def __getitem__(self, item): for d in self._internal_dicts: try: return d[item] except KeyError: pass raise KeyError def __setitem__(self, key, value): self._internal_dicts[0][key] = value def get(self, item, default=None): try: return self[item] except KeyError: return default def keys(self): return set(reduce(operator.or_, (set(d.keys()) for d in self._internal_dicts))) def items(self): all_items = reduce(operator.add, (list(d.items()) for d in reversed(self._internal_dicts))) unique_items = {k: v for k, v in all_items}.items() return tuple(unique_items) def __contains__(self, item): for d in self._internal_dicts: if item in d: return True return False def __len__(self): return len(self.keys()) @property def neutralized(self): return {k: raw.to_raw(v) for k, v in self.items()} @property def primitive_value(self): return self.neutralized def __repr__(self): return repr(self.primitive_value) PKbLopoetry/toml/freshtable.pyfrom .prettify.elements.table import TableElement class FreshTable(TableElement): """ A fresh TableElement that appended itself to each of parents when it first gets written to at most once. parents is a sequence of objects providing an append_fresh_table(TableElement) method """ def __init__(self, parent, name, is_array=False): TableElement.__init__(self, sub_elements=[]) self._parent = parent self._name = name self._is_array = is_array # As long as this flag is false, setitem() operations will append the table header and this table # to the toml_file's elements self.__appended = False @property def name(self): return self._name @property def is_array(self): return self._is_array def _append_to_parent(self): """ Causes this ephemeral table to be persisted on the TOMLFile. """ if self.__appended: return if self._parent is not None: self._parent.append_fresh_table(self) self.__appended = True def __setitem__(self, key, value): TableElement.__setitem__(self, key, value) self._append_to_parent() PKbLpoetry/toml/peekableit.pyimport itertools class PeekableIterator: # Returned by peek() when the iterator is exhausted. Truthiness is False. Nothing = tuple() def __init__(self, iter): self._iter = iter def __next__(self): return next(self._iter) def next(self): return self.__next__() def __iter__(self): return self def peek(self): """ Returns PeekableIterator.Nothing when the iterator is exhausted. """ try: v = next(self._iter) self._iter = itertools.chain((v,), self._iter) return v except StopIteration: return PeekableIterator.Nothing PK~bL+gŸ poetry/toml/prettify/__init__.pyfrom ._version import VERSION __version__ = VERSION def prettify(toml_text): """ Prettifies and returns the TOML file content provided. """ from .parser import parse_tokens from .lexer import tokenize from .prettifier import prettify as element_prettify tokens = tokenize(toml_text, is_top_level=True) elements = parse_tokens(tokens) prettified = element_prettify(elements) return ''.join(pretty_element.serialized() for pretty_element in prettified) def prettify_from_file(file_path): """ Reads, prettifies and returns the TOML file specified by the file_path. """ with open(file_path, 'r') as fp: return prettify(fp.read()) PK~bLy poetry/toml/prettify/_version.pyVERSION = 'master' PK~bLcss)poetry/toml/prettify/elements/__init__.py """ TOML file elements (a higher abstraction layer than individual lexical tokens). """ from .traversal import TraversalMixin from .errors import InvalidElementError from .table import TableElement from .tableheader import TableHeaderElement from .common import TYPE_METADATA, TYPE_ATOMIC, TYPE_CONTAINER, TYPE_MARKUP from . import traversal from . import factory PKdL5 .poetry/toml/prettify/elements/abstracttable.pyfrom .common import ContainerElement from . import traversal class AbstractTable(ContainerElement, traversal.TraversalMixin, dict): """ Common code for handling tables as key-value pairs with metadata elements sprinkled all over. Assumes input sub_elements are correct. """ def __init__(self, sub_elements): ContainerElement.__init__(self, sub_elements) self._fallback = None def _enumerate_items(self): """ Returns ((key_index, key_element), (value_index, value_element)) for all the element key-value pairs. """ non_metadata = self._enumerate_non_metadata_sub_elements() while True: yield next(non_metadata), next(non_metadata) def items(self): for (key_i, key), (value_i, value) in self._enumerate_items(): yield key.value, value.value if self._fallback: for key, value in self._fallback.items(): yield key, value def keys(self): return tuple(key for (key, _) in self.items()) def values(self): return tuple(value for (_, value) in self.items()) def __len__(self): return len(tuple(self._enumerate_items())) def __contains__(self, item): return item in self.keys() def _find_key_and_value(self, key): """ Returns (key_i, value_i) corresponding to the given key value. Raises KeyError if no matching key found. """ for (key_i, key_element), (value_i, value_element) in self._enumerate_items(): if key_element.value == key: return key_i, value_i raise KeyError def __getitem__(self, item): for key, value in self.items(): if key == item: return value raise KeyError def get(self, key, default=None): try: return self[key] except KeyError: return default def set_fallback(self, fallback): """ Sets a fallback dict-like instance to be used to look up values after they are not found in this instance. """ self._fallback = fallback @property def primitive_value(self): """ Returns a primitive Python value without any formatting or markup metadata. """ return { key: value.primitive_value if hasattr(value, 'primitive_value') else value for key, value in self.items() } PK+cLBN&poetry/toml/prettify/elements/array.pyfrom . import factory, traversal from .common import Element, ContainerElement from .factory import create_element from .metadata import NewlineElement from .errors import InvalidElementError class ArrayElement(ContainerElement, traversal.TraversalMixin, list): """ A sequence-like container element containing other atomic elements or other containers. Implements list-like interface. Assumes input sub_elements are correct for an array element. Raises an InvalidElementError if contains heterogeneous values. """ def __init__(self, sub_elements): super(ArrayElement, self).__init__(sub_elements) self._check_homogeneity() def _check_homogeneity(self): if len(set(type(v) for v in self.primitive_value)) > 1: raise InvalidElementError('Array should be homogeneous') def __len__(self): return len(tuple(self._enumerate_non_metadata_sub_elements())) def __getitem__(self, i): """ Returns the ith entry, which can be a primitive value, a seq-lie, or a dict-like object. """ return self._find_value(i)[1].value def __setitem__(self, i, value): value_i, _ = self._find_value(i) new_element = value if isinstance(value, Element) else factory.create_element(value) self._sub_elements = self.sub_elements[:value_i] + [new_element] + self.sub_elements[value_i+1:] @property def value(self): return self # self is a sequence-like value @property def primitive_value(self): """ Returns a primitive Python value without any formatting or markup metadata. """ return list( self[i].primitive_value if hasattr(self[i], 'primitive_value') else self[i] for i in range(len(self))) def __str__(self): return "{}".format(self.primitive_value) def __repr__(self): return "Array{}".format(str(self)) def append(self, v): new_entry = [create_element(v)] if self: # If not empty, we need a comma and whitespace prefix! new_entry = [ factory.create_operator_element(','), factory.create_whitespace_element(), ] + new_entry insertion_index = self._find_closing_square_bracket() self._sub_elements = self._sub_elements[:insertion_index] + new_entry + \ self._sub_elements[insertion_index:] def _find_value(self, i): """ Returns (value_index, value) of ith value in this sequence. Raises IndexError if not found. """ return tuple(self._enumerate_non_metadata_sub_elements())[i] def __delitem__(self, i): value_i, value = self._find_value(i) begin, end = value_i, value_i+1 # Rules: # 1. begin should be index to the preceding comma to the value # 2. end should be index to the following comma, or the closing bracket # 3. If no preceding comma found but following comma found then end should be the index of the following value preceding_comma = self._find_preceding_comma(value_i) found_preceding_comma = preceding_comma >= 0 if found_preceding_comma: begin = preceding_comma following_comma = self._find_following_comma(value_i) if following_comma >= 0: if not found_preceding_comma: end = self._find_following_non_metadata(following_comma) else: end = following_comma else: end = self._find_following_closing_square_bracket(0) self._sub_elements = self.sub_elements[:begin] + self._sub_elements[end:] @property def is_multiline(self): return any(isinstance(e, (NewlineElement)) for e in self.elements) def turn_into_multiline(self): """ Turns this array into a multi-line array with each element lying on its own line. """ if self.is_multiline: return i = self._find_following_comma(-1) def next_entry_i(): return self._find_following_non_metadata(i) def next_newline_i(): return self._find_following_newline(i) def next_closing_bracket_i(): return self._find_following_closing_square_bracket(i) def next_comma_i(): return self._find_following_comma(i) while i < len(self.elements)-1: if next_newline_i() < next_entry_i(): self.elements.insert(i+1, factory.create_newline_element()) if float('-inf') < next_comma_i() < next_closing_bracket_i(): i = next_comma_i() else: i = next_closing_bracket_i() PK-bL='poetry/toml/prettify/elements/atomic.pyfrom ..tokens import py2toml, toml2py from ..util import is_dict_like, is_sequence_like from . import common from .errors import InvalidElementError class AtomicElement(common.TokenElement): """ An element containing a sequence of tokens representing a single atomic value that can be updated in place. Raises: InvalidElementError: when passed an invalid sequence of tokens. """ def __init__(self, _tokens): common.TokenElement.__init__(self, _tokens, common.TYPE_ATOMIC) def _validate_tokens(self, _tokens): if len([token for token in _tokens if not token.type.is_metadata]) != 1: raise InvalidElementError('Tokens making up an AtomicElement must contain only one non-metadata token') def serialized(self): return ''.join(token.source_substring for token in self.tokens) def _value_token_index(self): """ Finds the token where the value is stored. """ # TODO: memoize this value for i, token in enumerate(self.tokens): if not token.type.is_metadata: return i raise RuntimeError('could not find a value token') @property def value(self): """ Returns a Python value contained in this atomic element. """ return toml2py.deserialize(self._tokens[self._value_token_index()]) @property def primitive_value(self): return self.value def set(self, value): """ Sets the contained value to the given one. """ assert (not is_sequence_like(value)) and (not is_dict_like(value)), 'the value must be an atomic primitive' token_index = self._value_token_index() self._tokens[token_index] = py2toml.create_primitive_token(value) PKcL/  'poetry/toml/prettify/elements/common.pyfrom abc import abstractmethod TYPE_METADATA = 'element-metadata' TYPE_ATOMIC = 'element-atomic' TYPE_CONTAINER = 'element-container' TYPE_MARKUP = 'element-markup' class Element: """ An Element: - is one or more Token instances, or one or more other Element instances. Not both. - knows how to serialize its value back to valid TOML code. A non-metadata Element is an Element that: - knows how to deserialize its content into usable Python primitive, seq-like, or dict-like value. - knows how to update its content from a Python primitive, seq-like, or dict-like value while maintaining its formatting. """ def __init__(self, _type): self._type = _type @property def type(self): return self._type @abstractmethod def serialized(self): """ TOML serialization of this element as str. """ raise NotImplementedError class TokenElement(Element): """ An Element made up of tokens """ def __init__(self, _tokens, _type): Element.__init__(self, _type) self._validate_tokens(_tokens) self._tokens = list(_tokens) @property def tokens(self): return self._tokens @property def first_token(self): return self._tokens[0] @abstractmethod def _validate_tokens(self, _tokens): raise NotImplementedError def serialized(self): return ''.join(token.source_substring for token in self._tokens) def __repr__(self): return repr(self.tokens) @property def primitive_value(self): """ Returns a primitive Python value without any formatting or markup metadata. """ raise NotImplementedError class ContainerElement(Element): """ An Element containing exclusively other elements. """ def __init__(self, sub_elements): Element.__init__(self, TYPE_CONTAINER) self._sub_elements = list(sub_elements) @property def sub_elements(self): return self._sub_elements @property def elements(self): return self.sub_elements def serialized(self): return ''.join(element.serialized() for element in self.sub_elements) def __eq__(self, other): return self.primitive_value == other def __repr__(self): return repr(self.primitive_value) @property def primitive_value(self): """ Returns a primitive Python value without any formatting or markup metadata. """ raise NotImplementedError PK~bLVػbb'poetry/toml/prettify/elements/errors.py class InvalidElementError(Exception): """ Raised by Element factories when the given sequence of tokens or sub-elements are invalid for the specific type of Element being created. """ def __init__(self, message): self.message = message def __repr__(self): return "InvalidElementError: {}".format(self.message) PKdbLAL(poetry/toml/prettify/elements/factory.pyimport datetime import six from .. import tokens from ..tokens import py2toml from ..util import join_with from .atomic import AtomicElement from .metadata import PunctuationElement, WhitespaceElement, NewlineElement from .tableheader import TableHeaderElement def create_element(value, multiline_strings_allowed=True): """ Creates and returns the appropriate elements.Element instance from the given Python primitive, sequence-like, or dict-like value. """ from .array import ArrayElement if isinstance(value, (int, float, bool, datetime.datetime, datetime.date) + six.string_types) or value is None: primitive_token = py2toml.create_primitive_token(value, multiline_strings_allowed=multiline_strings_allowed) return AtomicElement((primitive_token,)) elif isinstance(value, (list, tuple)): preamble = [create_operator_element('[')] postable = [create_operator_element(']')] stuffing_elements = [create_element(v) for v in value] spaced_stuffing = join_with(stuffing_elements, separator=[create_operator_element(','), create_whitespace_element()]) return ArrayElement(preamble + spaced_stuffing + postable) elif isinstance(value, dict): return create_inline_table(value, multiline_table=False, multiline_strings_allowed=multiline_strings_allowed) else: raise RuntimeError('Value type unaccounted for: {} of type {}'.format(value, type(value))) def create_inline_table(from_dict, multiline_table=False, multiline_strings_allowed=True): """ Creates an InlineTable element from the given dict instance. """ from .inlinetable import InlineTableElement preamble = [create_operator_element('{')] postable = [create_operator_element('}')] stuffing_elements = ( ( create_string_element(k, bare_allowed=True), create_whitespace_element(), create_operator_element('='), create_whitespace_element(), create_element(v, multiline_strings_allowed=False) ) for (k, v) in from_dict.items()) pair_separator = [create_operator_element(','), create_newline_element() if multiline_table else create_whitespace_element()] spaced_elements = join_with(stuffing_elements, separator=pair_separator) return InlineTableElement(preamble + spaced_elements + postable) def create_string_element(value, bare_allowed=False): """ Creates and returns an AtomicElement wrapping a string value. """ return AtomicElement((py2toml.create_string_token(value, bare_allowed),)) def create_operator_element(operator): """ Creates a PunctuationElement instance containing an operator token of the specified type. The operator should be a TOML source str. """ operator_type_map = { ',': tokens.TYPE_OP_COMMA, '=': tokens.TYPE_OP_ASSIGNMENT, '[': tokens.TYPE_OP_SQUARE_LEFT_BRACKET, ']': tokens.TYPE_OP_SQUARE_RIGHT_BRACKET, '[[': tokens.TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET, ']]': tokens.TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET, '{': tokens.TYPE_OP_CURLY_LEFT_BRACKET, '}': tokens.TYPE_OP_CURLY_RIGHT_BRACKET, } ts = (tokens.Token(operator_type_map[operator], operator),) return PunctuationElement(ts) def create_newline_element(): """ Creates and returns a single NewlineElement. """ ts = (tokens.Token(tokens.TYPE_NEWLINE, '\n'),) return NewlineElement(ts) def create_whitespace_element(length=1, char=' '): """ Creates and returns a WhitespaceElement containing spaces. """ ts = (tokens.Token(tokens.TYPE_WHITESPACE, char),) * length return WhitespaceElement(ts) def create_table_header_element(names): name_elements = [] if isinstance(names, six.string_types): name_elements = [py2toml.create_string_token(names, bare_string_allowed=True)] else: for (i, name) in enumerate(names): name_elements.append(py2toml.create_string_token(name, bare_string_allowed=True)) if i < (len(names)-1): name_elements.append(py2toml.operator_token(tokens.TYPE_OPT_DOT)) return TableHeaderElement( [py2toml.operator_token(tokens.TYPE_OP_SQUARE_LEFT_BRACKET)] + name_elements + [py2toml.operator_token(tokens.TYPE_OP_SQUARE_RIGHT_BRACKET), py2toml.operator_token(tokens.TYPE_NEWLINE)], ) def create_array_of_tables_header_element(name): return TableHeaderElement(( py2toml.operator_token(tokens.TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET), py2toml.create_string_token(name, bare_string_allowed=True), py2toml.operator_token(tokens.TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET), py2toml.operator_token(tokens.TYPE_NEWLINE), )) def create_table(dict_value): """ Creates a TableElement out of a dict instance. """ from .table import TableElement if not isinstance(dict_value, dict): raise ValueError('input must be a dict instance.') table_element = TableElement([create_newline_element()]) for k, v in dict_value.items(): table_element[k] = create_element(v) return table_element def create_multiline_string(text, maximum_line_length): return AtomicElement(_tokens=[py2toml.create_multiline_string(text, maximum_line_length)]) PKbL- - ,poetry/toml/prettify/elements/inlinetable.pyfrom . import factory, abstracttable from .common import Element class InlineTableElement(abstracttable.AbstractTable): """ An Element containing key-value pairs, representing an inline table. Implements dict-like interface. Assumes input sub_elements are correct for an inline table element. """ def __init__(self, sub_elements): abstracttable.AbstractTable.__init__(self, sub_elements) def __setitem__(self, key, value): new_element = value if isinstance(value, Element) else factory.create_element(value) try: key_i, value_i = self._find_key_and_value(key) # Found, then replace the value element with a new one self._sub_elements = self.sub_elements[:value_i] + [new_element] + self.sub_elements[value_i+1:] except KeyError: # Key does not exist, adding anew! new_entry = [ factory.create_string_element(key, bare_allowed=True), factory.create_whitespace_element(), factory.create_operator_element('='), factory.create_whitespace_element(), new_element, ] if self: # If not empty new_entry = [ factory.create_operator_element(','), factory.create_whitespace_element(), ] + new_entry insertion_index = self._find_closing_curly_bracket() self._sub_elements = self.sub_elements[:insertion_index] + new_entry + self.sub_elements[insertion_index:] def __delitem__(self, key): key_i, value_i = self._find_key_and_value(key) begin, end = key_i, value_i+1 # Rules: # 1. begin should be index to the preceding comma to the key # 2. end should be index to the following comma, or the closing bracket # 3. If no preceding comma found but following comma found then end should be the index of the following key preceding_comma = self._find_preceding_comma(begin) found_preceding_comma = preceding_comma >= 0 if found_preceding_comma: begin = preceding_comma following_comma = self._find_following_comma(value_i) if following_comma >= 0: if not found_preceding_comma: end = self._find_following_non_metadata(following_comma) else: end = following_comma else: end = self._find_closing_curly_bracket() self._sub_elements = self.sub_elements[:begin] + self.sub_elements[end:] def multiline_equivalent(self): return factory.create_inline_table(self.primitive_value, multiline_table=True, multiline_strings_allowed=True) @property def value(self): return self # self is a dict-like value that is perfectly usable PK bL(6 )poetry/toml/prettify/elements/metadata.pyfrom .. import tokens from . import common from .errors import InvalidElementError class WhitespaceElement(common.TokenElement): """ An element that contains tokens of whitespace """ def __init__(self, _tokens): common.TokenElement.__init__(self, _tokens, common.TYPE_METADATA) def _validate_tokens(self, _tokens): for token in _tokens: if token.type != tokens.TYPE_WHITESPACE: raise InvalidElementError('Tokens making up a WhitespaceElement must all be whitespace') @property def length(self): """ The whitespace length of this element """ return len(self.tokens) class NewlineElement(common.TokenElement): """ An element containing newline tokens Raises: InvalidElementError: when passed an invalid sequence of tokens. """ def __init__(self, _tokens): common.TokenElement.__init__(self, _tokens, common.TYPE_METADATA) def _validate_tokens(self, _tokens): for token in _tokens: if token.type != tokens.TYPE_NEWLINE: raise InvalidElementError('Tokens making a NewlineElement must all be newlines') class CommentElement(common.TokenElement): """ An element containing a single comment token followed by a newline. Raises: InvalidElementError: when passed an invalid sequence of tokens. """ def __init__(self, _tokens): common.TokenElement.__init__(self, _tokens, common.TYPE_METADATA) def _validate_tokens(self, _tokens): if len(_tokens) != 2 or _tokens[0].type != tokens.TYPE_COMMENT or _tokens[1].type != tokens.TYPE_NEWLINE: raise InvalidElementError('CommentElement needs one comment token followed by one newline token') class PunctuationElement(common.TokenElement): """ An element containing a single punctuation token. Raises: InvalidElementError: when passed an invalid sequence of tokens. """ def __init__(self, _tokens): common.TokenElement.__init__(self, _tokens, common.TYPE_METADATA) @property def token(self): """ Returns the token contained in this Element. """ return self.tokens[0] def _validate_tokens(self, _tokens): if not _tokens or not tokens.is_operator(_tokens[0]): raise InvalidElementError('PunctuationElement must be made of only a single operator token') PKdLDC(&poetry/toml/prettify/elements/table.pyfrom . import abstracttable, common, factory from .errors import InvalidElementError from .common import Element from .metadata import CommentElement, NewlineElement, WhitespaceElement class TableElement(abstracttable.AbstractTable): """ An Element containing an unnamed top-level table. Implements dict-like interface. Assumes input sub_elements are correct. Raises InvalidElementError on duplicate keys. """ def __init__(self, sub_elements): abstracttable.AbstractTable.__init__(self, sub_elements) self._check_for_duplicate_keys() def _check_for_duplicate_keys(self): if len(set(self.keys())) < len(self.keys()): raise InvalidElementError('Duplicate keys found') def __setitem__(self, key, value): if key in self: self._update(key, value) else: self._insert(key, value) def _update(self, key, value): _, value_i = self._find_key_and_value(key) self._sub_elements[value_i] = value if isinstance(value, Element) else factory.create_element(value) def _find_insertion_index(self): """ Returns the self.sub_elements index in which new entries should be inserted. """ non_metadata_elements = tuple(self._enumerate_non_metadata_sub_elements()) if not non_metadata_elements: return 0 last_entry_i = non_metadata_elements[-1][0] following_newline_i = self._find_following_line_terminator(last_entry_i) return following_newline_i + 1 def _detect_indentation_size(self): """ Detects the level of indentation used in this table. """ def lines(): # Returns a sequence of sequences of elements belonging to each line start = 0 for i, element in enumerate(self.elements): if isinstance(element, (CommentElement, NewlineElement)): yield self.elements[start:i+1] start = i+1 def indentation(line): # Counts the number of whitespace tokens at the beginning of this line try: first_non_whitespace_i = next(i for (i, e) in enumerate(line) if not isinstance(e, WhitespaceElement)) return sum(space.length for space in line[:first_non_whitespace_i]) except StopIteration: return 0 def is_empty_line(line): return all(e.type == common.TYPE_METADATA for e in line) try: return min(indentation(line) for line in lines() if len(line) > 1 and not is_empty_line(line)) except ValueError: # Raised by ValueError when no matching lines found return 0 def _insert(self, key, value): value_element = value if isinstance(value, Element) else factory.create_element(value) indentation_size = self._detect_indentation_size() indentation = [factory.create_whitespace_element(self._detect_indentation_size())] if indentation_size else [] inserted_elements = indentation + [ factory.create_string_element(key, bare_allowed=True), factory.create_whitespace_element(), factory.create_operator_element('='), factory.create_whitespace_element(), value_element, factory.create_newline_element(), ] insertion_index = self._find_insertion_index() self._sub_elements = \ self.sub_elements[:insertion_index] + inserted_elements + self.sub_elements[insertion_index:] def __delitem__(self, key): begin, _ = self._find_key_and_value(key) preceding_newline = self._find_preceding_newline(begin) if preceding_newline >= 0: begin = preceding_newline end = self._find_following_newline(begin) if end < 0: end = len(tuple(self._sub_elements)) self._sub_elements = self.sub_elements[:begin] + self.sub_elements[end:] @property def value(self): return self def __eq__(self, other): return self.primitive_value == other def __iter__(self): return iter(self.keys()) def __str__(self): return str(self.primitive_value) PKbLoR R ,poetry/toml/prettify/elements/tableheader.pyfrom .. import tokens from ..tokens import toml2py from . import common from .common import TokenElement from .errors import InvalidElementError _opening_bracket_types = (tokens.TYPE_OP_SQUARE_LEFT_BRACKET, tokens.TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET) _closing_bracket_types = (tokens.TYPE_OP_SQUARE_RIGHT_BRACKET, tokens.TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET) _name_types = ( tokens.TYPE_BARE_STRING, tokens.TYPE_LITERAL_STRING, tokens.TYPE_STRING, ) class TableHeaderElement(TokenElement): """ An element containing opening and closing single and double square brackets, strings and dots and ending with a newline. Raises InvalidElementError. """ def __init__(self, _tokens): TokenElement.__init__(self, _tokens, common.TYPE_MARKUP) self._names = tuple(toml2py.deserialize(token) for token in self._tokens if token.type in _name_types) @property def is_array_of_tables(self): opening_bracket = next(token for i, token in enumerate(self._tokens) if token.type in _opening_bracket_types) return opening_bracket.type == tokens.TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET @property def names(self): """ Returns a sequence of string names making up this table header name. """ return self._names def has_name_prefix(self, names): """ Returns True if the header names is prefixed by the given sequence of names. """ for i, name in enumerate(names): if self.names[i] != name: return False return True def serialized(self): return ''.join(token.source_substring for token in self._tokens) def is_named(self, names): """ Returns True if the given name sequence matches the full name of this header. """ return tuple(names) == self.names def _validate_tokens(self, _tokens): opening_bracket_i = next((i for i, token in enumerate(_tokens) if token.type in _opening_bracket_types), float('-inf')) if opening_bracket_i < 0: raise InvalidElementError('Expected an opening bracket') _tokens = _tokens[opening_bracket_i+1:] first_name_i = next((i for i, token in enumerate(_tokens) if token.type in _name_types), float('-inf')) if first_name_i < 0: raise InvalidElementError('Expected a table header name') _tokens = _tokens[first_name_i+1:] while True: next_dot_i = next((i for i, token in enumerate(_tokens) if token.type == tokens.TYPE_OPT_DOT), float('-inf')) if next_dot_i < 0: break _tokens = _tokens[next_dot_i+1:] next_name_i = next((i for i, token in enumerate(_tokens) if token.type in _name_types), float('-inf')) if next_name_i < 0: raise InvalidElementError('Expected a name after the dot') _tokens = _tokens[next_name_i+1:] closing_bracket_i = next((i for i, token in enumerate(_tokens) if token.type in _closing_bracket_types), float('-inf')) if closing_bracket_i < 0: raise InvalidElementError('Expected a closing bracket') if _tokens[-1].type != tokens.TYPE_NEWLINE: raise InvalidElementError('Must end with a newline') PKcL2'663poetry/toml/prettify/elements/traversal/__init__.pyfrom ...tokens import TYPE_OP_COMMA from ...tokens import TYPE_OP_CURLY_RIGHT_BRACKET from ..common import TYPE_METADATA from ..metadata import PunctuationElement, NewlineElement from . import predicates class TraversalMixin: """ A mix-in that provides convenient sub-element traversal to any class with an `elements` member that is a sequence of Element instances """ def __find_following_element(self, index, predicate): """ Finds and returns the index of element in self.elements that evaluates the given predicate to True and whose index is higher than the given index, or returns -Infinity on failure. """ return find_following(self.elements, predicate, index) def __find_preceding_element(self, index, predicate): """ Finds and returns the index of the element in self.elements that evaluates the given predicate to True and whose index is lower than the given index. """ i = find_previous(self.elements, predicate, index) if i == float('inf'): return float('-inf') return i def __must_find_following_element(self, predicate): """ Finds and returns the index to the element in self.elements that evaluatest the predicate to True, or raises an error. """ i = self.__find_following_element(-1, predicate) if i < 0: raise RuntimeError('Could not find non-optional element') return i def _enumerate_non_metadata_sub_elements(self): """ Returns a sequence of of (index, sub_element) of the non-metadata sub-elements. """ return ((i, element) for i, element in enumerate(self.elements) if element.type != TYPE_METADATA) def _find_preceding_comma(self, index): """ Returns the index of the preceding comma element to the given index, or -Infinity. """ return self.__find_preceding_element(index, predicates.op_comma) def _find_following_comma(self, index): """ Returns the index of the following comma element after the given index, or -Infinity. """ def predicate(element): return isinstance(element, PunctuationElement) and element.token.type == TYPE_OP_COMMA return self.__find_following_element(index, predicate) def _find_following_newline(self, index): """ Returns the index of the following newline element after the given index, or -Infinity. """ return self.__find_following_element(index, lambda e: isinstance(e, NewlineElement)) def _find_following_comment(self, index): """ Returns the index of the following comment element after the given index, or -Infinity. """ return self.__find_following_element(index, predicates.comment) def _find_following_line_terminator(self, index): """ Returns the index of the following comment or newline element after the given index, or -Infinity. """ following_comment = self._find_following_comment(index) following_newline = self._find_following_newline(index) if following_comment == float('-inf'): return following_newline if following_newline == float('inf'): return following_comment if following_newline < following_comment: return following_newline else: return following_comment def _find_preceding_newline(self, index): """ Returns the index of the preceding newline element to the given index, or -Infinity. """ return self.__find_preceding_element(index, predicates.newline) def _find_following_non_metadata(self, index): """ Returns the index to the following non-metadata element after the given index, or -Infinity. """ return self.__find_following_element(index, predicates.non_metadata) def _find_closing_square_bracket(self): """ Returns the index to the closing square bracket, or raises an Error. """ return self.__must_find_following_element(predicates.closing_square_bracket) def _find_following_opening_square_bracket(self, index): """ Returns the index to the opening square bracket, or -Infinity. """ return self.__find_following_element(index, predicates.opening_square_bracket) def _find_following_closing_square_bracket(self, index): """ Returns the index to the closing square bracket, or -Infinity. """ return self.__find_following_element(index, predicates.closing_square_bracket) def _find_following_table(self, index): """ Returns the index to the next TableElement after the specified index, or -Infinity. """ return self.__find_following_element(index, predicates.table) def _find_preceding_table(self, index): """ Returns the index to the preceding TableElement to the specified index, or -Infinity. """ return self.__find_preceding_element(index,predicates.table) def _find_closing_curly_bracket(self): """ Returns the index to the closing curly bracket, or raises an Error. """ def predicate(element): return isinstance(element, PunctuationElement) and element.token.type == TYPE_OP_CURLY_RIGHT_BRACKET return self.__must_find_following_element(predicate) def _find_following_table_header(self, index): """ Returns the index to the table header after the given element index, or -Infinity. """ return self.__find_following_element(index, predicates.table_header) def find_following(element_seq, predicate, index=None): """ Finds and returns the index of the next element fulfilling the specified predicate after the specified index, or -Infinity. Starts searching linearly from the start_from index. """ if isinstance(index, (int, float)) and index < 0: index = None for i, element in tuple(enumerate(element_seq))[index+1 if index is not None else index:]: if predicate(element): return i return float('-inf') def find_previous(element_seq, predicate, index=None): """ Finds and returns the index of the previous element fulfilling the specified predicate preceding to the specified index, or Infinity. """ if isinstance(index, (int, float)) and index >= len(element_seq): index = None for i, element in reversed(tuple(enumerate(element_seq))[:index]): if predicate(element): return i return float('inf') PKbL$#X<<5poetry/toml/prettify/elements/traversal/predicates.py """ The following predicates can be used in the traversal functions directly. """ from ...tokens import TYPE_OP_ASSIGNMENT from ...tokens import TYPE_OP_COMMA from ...tokens import TYPE_OP_SQUARE_LEFT_BRACKET from ...tokens import TYPE_OP_SQUARE_RIGHT_BRACKET from ..atomic import AtomicElement from ..metadata import PunctuationElement, CommentElement, NewlineElement, WhitespaceElement from .. import common atomic = lambda e: isinstance(e, AtomicElement) op_assignment = lambda e: isinstance(e, PunctuationElement) and e.token.type == TYPE_OP_ASSIGNMENT op_comma = lambda e: isinstance(e, PunctuationElement) and e.token.type == TYPE_OP_COMMA comment = lambda e: isinstance(e, CommentElement) newline = lambda e: isinstance(e, NewlineElement) non_metadata = lambda e: e.type != common.TYPE_METADATA closing_square_bracket = \ lambda e: isinstance(e, PunctuationElement) and e.token.type == TYPE_OP_SQUARE_RIGHT_BRACKET opening_square_bracket = \ lambda e: isinstance(e, PunctuationElement) and e.token.type == TYPE_OP_SQUARE_LEFT_BRACKET def table(e): from ..table import TableElement return isinstance(e, TableElement) def table_header(e): from ..tableheader import TableHeaderElement return isinstance(e, TableHeaderElement) whitespace = lambda e: isinstance(e, WhitespaceElement) PK~bL/|!!poetry/toml/prettify/errors.py class TOMLError(Exception): """ All errors raised by this module are descendants of this type. """ class InvalidTOMLFileError(TOMLError): pass class NoArrayFoundError(TOMLError): """ An array of tables was requested but none exist by the given name. """ class InvalidValueError(TOMLError): pass class DuplicateKeysError(TOMLError): """ Duplicate keys detected in the parsed file. """ class DuplicateTablesError(TOMLError): """ Duplicate tables detected in the parsed file. """ PKbL S&poetry/toml/prettify/lexer/__init__.py """ A regular expression based Lexer/tokenizer for TOML. """ from collections import namedtuple import re from .. import tokens from ..errors import TOMLError TokenSpec = namedtuple('TokenSpec', ('type', 're')) # Specs of all the valid tokens _LEXICAL_SPECS = ( TokenSpec(tokens.TYPE_COMMENT, re.compile(r'^(#.*)\n')), TokenSpec(tokens.TYPE_STRING, re.compile(r'^("(([^"]|\\")+?[^\\]|([^"]|\\")|)")')), # Single line only TokenSpec(tokens.TYPE_MULTILINE_STRING, re.compile(r'^(""".*?""")', re.DOTALL)), TokenSpec(tokens.TYPE_LITERAL_STRING, re.compile(r"^('.*?')")), TokenSpec(tokens.TYPE_MULTILINE_LITERAL_STRING, re.compile(r"^('''.*?''')", re.DOTALL)), TokenSpec(tokens.TYPE_BARE_STRING, re.compile(r'^([A-Za-z0-9_-]+)')), TokenSpec(tokens.TYPE_DATE, re.compile( r'^([0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]*)?)?(([zZ])|((\+|-)[0-9]{2}:[0-9]{2}))?)')), TokenSpec(tokens.TYPE_WHITESPACE, re.compile(r'^( |\t)', re.DOTALL)), TokenSpec(tokens.TYPE_INTEGER, re.compile(r'^(((\+|-)[0-9_]+)|([0-9][0-9_]*))')), TokenSpec(tokens.TYPE_FLOAT, re.compile(r'^((((\+|-)[0-9_]+)|([1-9][0-9_]*))(\.[0-9_]+)?([eE](\+|-)?[0-9_]+)?)')), TokenSpec(tokens.TYPE_BOOLEAN, re.compile(r'^(true|false)')), TokenSpec(tokens.TYPE_OP_SQUARE_LEFT_BRACKET, re.compile(r'^(\[)')), TokenSpec(tokens.TYPE_OP_SQUARE_RIGHT_BRACKET, re.compile(r'^(\])')), TokenSpec(tokens.TYPE_OP_CURLY_LEFT_BRACKET, re.compile(r'^(\{)')), TokenSpec(tokens.TYPE_OP_CURLY_RIGHT_BRACKET, re.compile(r'^(\})')), TokenSpec(tokens.TYPE_OP_ASSIGNMENT, re.compile(r'^(=)')), TokenSpec(tokens.TYPE_OP_COMMA, re.compile(r'^(,)')), TokenSpec(tokens.TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET, re.compile(r'^(\[\[)')), TokenSpec(tokens.TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET, re.compile(r'^(\]\])')), TokenSpec(tokens.TYPE_OPT_DOT, re.compile(r'^(\.)')), TokenSpec(tokens.TYPE_NEWLINE, re.compile('^(\n|\r\n)')), ) def _next_token_candidates(source): matches = [] for token_spec in _LEXICAL_SPECS: match = token_spec.re.search(source) if match: matches.append(tokens.Token(token_spec.type, match.group(1))) return matches def _choose_from_next_token_candidates(candidates): if len(candidates) == 1: return candidates[0] elif len(candidates) > 1: # Return the maximal-munch with ties broken by natural order of token type. maximal_munch_length = max(len(token.source_substring) for token in candidates) maximal_munches = [token for token in candidates if len(token.source_substring) == maximal_munch_length] return sorted(maximal_munches)[0] # Return the first in sorting by priority def _munch_a_token(source): """ Munches a single Token instance if it could recognize one at the beginning of the given source text, or None if no token type could be recognized. """ candidates = _next_token_candidates(source) return _choose_from_next_token_candidates(candidates) class LexerError(TOMLError): def __init__(self, message): self._message = message def __repr__(self): return self._message def __str__(self): return self._message def tokenize(source, is_top_level=False): """ Tokenizes the input TOML source into a stream of tokens. If is_top_level is set to True, will make sure that the input source has a trailing newline character before it is tokenized. Raises a LexerError when it fails recognize another token while not at the end of the source. """ # Newlines are going to be normalized to UNIX newlines. source = source.replace('\r\n', '\n') if is_top_level and source and source[-1] != '\n': source += '\n' next_row = 1 next_col = 1 next_index = 0 while next_index < len(source): new_token = _munch_a_token(source[next_index:]) if not new_token: raise LexerError("failed to read the next token at ({}, {}): {}".format( next_row, next_col, source[next_index:])) # Set the col and row on the new token new_token = tokens.Token(new_token.type, new_token.source_substring, next_col, next_row) # Advance the index, row and col count next_index += len(new_token.source_substring) for c in new_token.source_substring: if c == '\n': next_row += 1 next_col = 1 else: next_col += 1 yield new_token PKbL"+'''poetry/toml/prettify/parser/__init__.py """ A parser for TOML tokens into TOML elements. """ from .elementsanitizer import sanitize from .errors import ParsingError from .parser import toml_file_elements from .tokenstream import TokenStream def parse_tokens(tokens): """ Parses the given token sequence into a sequence of top-level TOML elements. Raises ParserError on invalid TOML input. """ return _parse_token_stream(TokenStream(tokens)) def _parse_token_stream(token_stream): """ Parses the given token_stream into a sequence of top-level TOML elements. Raises ParserError on invalid input TOML. """ elements, pending = toml_file_elements(token_stream) if not pending.at_end: raise ParsingError('Failed to parse line {}'.format(pending.head.row)) return sanitize(elements) PKrbL0W/poetry/toml/prettify/parser/elementsanitizer.pyfrom ..elements import TYPE_METADATA from ..elements.table import TableElement from ..elements.tableheader import TableHeaderElement from ..errors import InvalidTOMLFileError from ..util import PeekableIterator def sanitize(_elements): """ Finds TableHeader elements that are not followed by TableBody elements and inserts empty TableElement right after those. """ output = list(_elements) def find_next_table_header(after=-1): return next((i for (i, element) in enumerate(output) if i > after and isinstance(element, TableHeaderElement)), float('-inf')) def find_next_table_body(after=-1): return next((i for (i, element) in enumerate(output) if i > after and isinstance(element, TableElement)), float('-inf')) next_table_header_i = find_next_table_header() while next_table_header_i >= 0: following_table_header_i = find_next_table_header(next_table_header_i) following_table_body_i = find_next_table_body(next_table_header_i) if (following_table_body_i < 0) or \ (following_table_header_i >= 0 and (following_table_header_i < following_table_body_i)): output.insert(next_table_header_i+1, TableElement(tuple())) next_table_header_i = find_next_table_header(next_table_header_i) return output def validate_sanitized(_elements): # Non-metadata elements must start with an optional TableElement, # followed by zero or more (TableHeaderElement, TableElement) pairs. if not _elements: return it = PeekableIterator(e for e in _elements if e.type != TYPE_METADATA) if isinstance(it.peek(), TableElement): it.next() while it.peek(): if not isinstance(it.peek(), TableHeaderElement): raise InvalidTOMLFileError it.next() if not isinstance(it.peek(), TableElement): raise InvalidTOMLFileError it.next() PKbL%poetry/toml/prettify/parser/errors.pyfrom ..errors import TOMLError class ParsingError(TOMLError): def __init__(self, message='', token=None): self.message = message self.token = token def __repr__(self): if self.message and self.token: return "{} at row {} and col {}".format( self.message, self.token.row, self.token.col ) else: return self.message def __str__(self): return repr(self) PK]bL@(66%poetry/toml/prettify/parser/parser.py """ A Recursive Descent implementation of a lexical parser for TOML. Grammar: -------- Newline -> NEWLINE Comment -> COMMENT Newline LineTerminator -> Comment | Newline Space -> WHITESPACE Space | WHITESPACE | EMPTY TableHeader -> Space [ Space TableHeaderName Space ] Space LineTerminator | Space [[ Space TableHeaderName Space ]] Space LineTerminator TableHeaderName -> STRING Space '.' Space TableHeaderName | STRING Atomic -> STRING | INTEGER | FLOAT | DATE | BOOLEAN Array -> '[' Space ArrayInternal Space ']' | '[' Space ArrayInternal Space LineTerminator Space ']' ArrayInternal -> LineTerminator Space ArrayInternal | Value Space ',' Space LineTerminator Space ArrayInternal | Value Space ',' Space ArrayInternal | LineTerminator | Value | EMPTY InlineTable -> '{' Space InlineTableInternal Space '}' InlineTableKeyValuePair = STRING Space '=' Space Value InlineTableInternal -> InlineTableKeyValuePair Space ',' Space InlineTableInternal | InlineTableKeyValuePair | Empty Value -> Atomic | InlineTable | Array KeyValuePair -> Space STRING Space '=' Space Value Space LineTerminator TableBody -> KeyValuePair TableBody | EmptyLine TableBody | EmptyLine | KeyValuePair EmptyLine -> Space LineTerminator FileEntry -> TableHeader | TableBody TOMLFileElements -> FileEntry TOMLFileElements | FileEntry | EmptyLine | EMPTY """ from ..elements.array import ArrayElement from ..elements.atomic import AtomicElement from ..elements.inlinetable import InlineTableElement from ..elements.metadata import NewlineElement, CommentElement, WhitespaceElement, PunctuationElement from ..elements.table import TableElement from ..elements.tableheader import TableHeaderElement from ..tokens import TYPE_BARE_STRING from ..tokens import TYPE_BOOLEAN from ..tokens import TYPE_COMMENT from ..tokens import TYPE_DATE from ..tokens import TYPE_FLOAT from ..tokens import TYPE_INTEGER from ..tokens import TYPE_LITERAL_STRING from ..tokens import TYPE_MULTILINE_LITERAL_STRING from ..tokens import TYPE_MULTILINE_STRING from ..tokens import TYPE_NEWLINE from ..tokens import TYPE_OP_ASSIGNMENT from ..tokens import TYPE_OP_COMMA from ..tokens import TYPE_OP_CURLY_LEFT_BRACKET from ..tokens import TYPE_OP_CURLY_RIGHT_BRACKET from ..tokens import TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET from ..tokens import TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET from ..tokens import TYPE_OP_SQUARE_LEFT_BRACKET from ..tokens import TYPE_OP_SQUARE_RIGHT_BRACKET from ..tokens import TYPE_OPT_DOT from ..tokens import TYPE_STRING from ..tokens import TYPE_WHITESPACE from .recdesc import capture_from from .errors import ParsingError """ Non-terminals are represented as functions which return (RESULT, pending_token_stream), or raise ParsingError. """ def token(token_type): def factory(ts): t = ts.head if t.type != token_type: raise ParsingError('Expected a token of type {}'.format(token_type)) return t, ts.tail return factory def newline_element(token_stream): """ Returns NewlineElement, pending_token_stream or raises ParsingError. """ captured = capture_from(token_stream).find(token(TYPE_NEWLINE)) return NewlineElement(captured.value()), captured.pending_tokens def comment_tokens(ts1): c1 = capture_from(ts1).find(token(TYPE_COMMENT)).and_find(token(TYPE_NEWLINE)) return c1.value(), c1.pending_tokens def comment_element(token_stream): """ Returns CommentElement, pending_token_stream or raises ParsingError. """ captured = capture_from(token_stream).find(comment_tokens) return CommentElement(captured.value()), captured.pending_tokens def line_terminator_tokens(token_stream): captured = capture_from(token_stream).find(comment_tokens).or_find(token(TYPE_NEWLINE)) return captured.value(), captured.pending_tokens def line_terminator_element(token_stream): captured = capture_from(token_stream).find(comment_element).or_find(newline_element) return captured.value('Expected a comment or a newline')[0], captured.pending_tokens def zero_or_more_tokens(token_type): def factory(token_stream): def more(ts): c = capture_from(ts).find(token(token_type)).and_find(zero_or_more_tokens(token_type)) return c.value(), c.pending_tokens def two(ts): c = capture_from(ts).find(token(TYPE_WHITESPACE)) return c.value(), c.pending def zero(ts): return tuple(), ts captured = capture_from(token_stream).find(more).or_find(two).or_find(zero) return captured.value(), captured.pending_tokens return factory def space_element(token_stream): captured = capture_from(token_stream).find(zero_or_more_tokens(TYPE_WHITESPACE)) return WhitespaceElement([t for t in captured.value() if t]), captured.pending_tokens def string_token(token_stream): captured = capture_from(token_stream).\ find(token(TYPE_BARE_STRING)).\ or_find(token(TYPE_STRING)).\ or_find(token(TYPE_LITERAL_STRING)).\ or_find(token(TYPE_MULTILINE_STRING)).\ or_find(token(TYPE_MULTILINE_LITERAL_STRING)) return captured.value('Expected a string'), captured.pending_tokens def string_element(token_stream): captured = capture_from(token_stream).find(string_token) return AtomicElement(captured.value()), captured.pending_tokens def table_header_name_tokens(token_stream): def one(ts): c = capture_from(ts).\ find(string_token).\ and_find(zero_or_more_tokens(TYPE_WHITESPACE)).\ and_find(token(TYPE_OPT_DOT)).\ and_find(zero_or_more_tokens(TYPE_WHITESPACE)).\ and_find(table_header_name_tokens) return c.value(), c.pending_tokens captured = capture_from(token_stream).find(one).or_find(string_token) return captured.value(), captured.pending_tokens def table_header_element(token_stream): def single(ts1): c1 = capture_from(ts1).\ find(zero_or_more_tokens(TYPE_WHITESPACE)).\ and_find(token(TYPE_OP_SQUARE_LEFT_BRACKET)).\ and_find(zero_or_more_tokens(TYPE_WHITESPACE)).\ and_find(table_header_name_tokens).\ and_find(zero_or_more_tokens(TYPE_WHITESPACE)).\ and_find(token(TYPE_OP_SQUARE_RIGHT_BRACKET)).\ and_find(zero_or_more_tokens(TYPE_WHITESPACE)).\ and_find(line_terminator_tokens) return c1.value(), c1.pending_tokens def double(ts2): c2 = capture_from(ts2).\ find(zero_or_more_tokens(TYPE_WHITESPACE)).\ and_find(token(TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET)).\ and_find(zero_or_more_tokens(TYPE_WHITESPACE)).\ and_find(table_header_name_tokens).\ and_find(zero_or_more_tokens(TYPE_WHITESPACE)).\ and_find(token(TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET)).\ and_find(zero_or_more_tokens(TYPE_WHITESPACE)).\ and_find(line_terminator_tokens) return c2.value(), c2.pending_tokens captured = capture_from(token_stream).find(single).or_find(double) return TableHeaderElement(captured.value()), captured.pending_tokens def atomic_element(token_stream): captured = capture_from(token_stream).\ find(string_token).\ or_find(token(TYPE_INTEGER)).\ or_find(token(TYPE_FLOAT)).\ or_find(token(TYPE_DATE)).\ or_find(token(TYPE_BOOLEAN)) return AtomicElement(captured.value('Expected an atomic primitive value')), captured.pending_tokens def punctuation_element(token_type): def factory(ts): c = capture_from(ts).find(token(token_type)) return PunctuationElement(c.value('Expected the punctuation element: {}'.format(token_type))), c.pending_tokens return factory def value(token_stream): captured = capture_from(token_stream).\ find(atomic_element).\ or_find(array_element).\ or_find(inline_table_element) return captured.value('Expected a primitive value, array or an inline table'), captured.pending_tokens def array_internal(ts): def zero(ts0): c = capture_from(ts0).\ and_find(line_terminator_element).\ and_find(space_element).\ and_find(array_internal) return c.value(), c.pending_tokens def one(ts1): c = capture_from(ts1).\ find(value).\ and_find(space_element).\ and_find(punctuation_element(TYPE_OP_COMMA)).\ and_find(space_element).\ and_find(line_terminator_element).\ and_find(space_element).\ and_find(array_internal) return c.value(), c.pending_tokens def two(ts2): c = capture_from(ts2).\ find(value).\ and_find(space_element).\ and_find(punctuation_element(TYPE_OP_COMMA)).\ and_find(space_element).\ and_find(array_internal) return c.value(), c.pending_tokens def three(ts3): c = capture_from(ts3).\ find(space_element).\ and_find(line_terminator_element) return c.value(), c.pending_tokens captured = capture_from(ts).find(zero).or_find(one).or_find(two).or_find(three).or_find(value).or_empty() return captured.value(), captured.pending_tokens def array_element(token_stream): def one(ts1): ca = capture_from(ts1).\ find(punctuation_element(TYPE_OP_SQUARE_LEFT_BRACKET)).\ and_find(space_element).\ and_find(array_internal).\ and_find(space_element).\ and_find(punctuation_element(TYPE_OP_SQUARE_RIGHT_BRACKET)) return ca.value(), ca.pending_tokens def two(ts2): ca = capture_from(ts2).\ find(punctuation_element(TYPE_OP_SQUARE_LEFT_BRACKET)).\ and_find(space_element).\ and_find(array_internal).\ and_find(space_element).\ and_find(line_terminator_element).\ and_find(space_element).\ and_find(punctuation_element(TYPE_OP_SQUARE_RIGHT_BRACKET)) return ca.value(), ca.pending_tokens captured = capture_from(token_stream).find(one).or_find(two) return ArrayElement(captured.value()), captured.pending_tokens def inline_table_element(token_stream): # InlineTableElement -> '{' Space InlineTableInternal Space '}' # InlineTableKeyValuePair = STRING Space '=' Space Value # InlineTableInternal -> InlineTableKeyValuePair Space ',' Space InlineTableInternal | # InlineTableKeyValuePair | Empty def key_value(ts): ca = capture_from(ts).\ find(string_element).\ and_find(space_element).\ and_find(punctuation_element(TYPE_OP_ASSIGNMENT)).\ and_find(space_element).\ and_find(value) return ca.value(), ca.pending_tokens def internal(ts): def one(ts1): c1 = capture_from(ts1).\ find(key_value).\ and_find(space_element).\ and_find(punctuation_element(TYPE_OP_COMMA)).\ and_find(space_element).\ and_find(internal) return c1.value(), c1.pending_tokens c = capture_from(ts).find(one).or_find(key_value).or_empty() return c.value(), c.pending_tokens captured = capture_from(token_stream).\ find(punctuation_element(TYPE_OP_CURLY_LEFT_BRACKET)).\ and_find(space_element).\ and_find(internal).\ and_find(space_element).\ and_find(punctuation_element(TYPE_OP_CURLY_RIGHT_BRACKET)) return InlineTableElement(captured.value()), captured.pending_tokens def key_value_pair(token_stream): captured = capture_from(token_stream).\ find(space_element).\ and_find(string_element).\ and_find(space_element).\ and_find(punctuation_element(TYPE_OP_ASSIGNMENT)).\ and_find(space_element).\ and_find(value).\ and_find(space_element).\ and_find(line_terminator_element) return captured.value(), captured.pending_tokens def table_body_elements(token_stream): # TableBody -> KeyValuePair TableBody | EmptyLine TableBody | EmptyLine | KeyValuePair def one(ts1): c = capture_from(ts1).\ find(key_value_pair).\ and_find(table_body_elements) return c.value(), c.pending_tokens def two(ts2): c = capture_from(ts2).\ find(empty_line_elements).\ and_find(table_body_elements) return c.value(), c.pending_tokens captured = capture_from(token_stream).\ find(one).\ or_find(two).\ or_find(empty_line_elements).\ or_find(key_value_pair) return captured.value(), captured.pending_tokens def table_body_element(token_stream): captured = capture_from(token_stream).find(table_body_elements) return TableElement(captured.value()), captured.pending_tokens def empty_line_tokens(ts1): c1 = capture_from(ts1).find(space_element).and_find(line_terminator_element) return c1.value(), c1.pending_tokens def empty_line_elements(token_stream): captured = capture_from(token_stream).find(empty_line_tokens) return captured.value(), captured.pending_tokens def file_entry_element(token_stream): captured = capture_from(token_stream).find(table_header_element).\ or_find(table_body_element) return captured.value(), captured.pending_tokens def toml_file_elements(token_stream): def one(ts1): c1 = capture_from(ts1).find(file_entry_element).and_find(toml_file_elements) return c1.value(), c1.pending_tokens captured = capture_from(token_stream).find(one).or_find(file_entry_element).or_empty() return captured.value(), captured.pending_tokens PKecL^/**&poetry/toml/prettify/parser/recdesc.pyfrom ..elements.array import ArrayElement from .errors import ParsingError from .tokenstream import TokenStream class Capturer: """ Recursive-descent matching DSL. Yeah.. """ def __init__(self, token_stream, value=tuple(), dormant_error=None): self._token_stream = token_stream self._value = value self._dormant_error = dormant_error def find(self, finder): """ Searches the token stream using the given finder. `finder(ts)` is a function that accepts a `TokenStream` instance and returns `(element, pending_ts)` where `element` is the found "something" or a sequence of "somethings", and `pending_ts` the unconsumed `TokenStream`. `finder(ts)` can raise `ParsingError` to indicate that it couldn't find anything, or a `TokenStream.EndOfStream` to indicate a premature end of the TokenStream. This method returns a Capturer instance that can be further used to find more and more "somethings". The value at any given moment can be retrieved via the `Capturer.value()` method. """ try: # Execute finder! element, pending_ts = finder(self._token_stream) # If result is not a sequence, make it so if isinstance(element, ArrayElement) or not isinstance(element, (tuple, list)): element = (element,) # Return a Capturer with accumulated findings return Capturer(pending_ts, value=self.value() + element) except ParsingError as e: # Failed to find, store error in returned value return Capturer(self._token_stream, dormant_error=e) except TokenStream.EndOfStream as e: # Premature end of stream, store error in returned value return Capturer(self._token_stream, dormant_error=e) def value(self, parsing_expectation_msg=None): """ Returns the accumulated values found as a sequence of values, or raises an encountered dormant error. If parsing_expectation_msg is specified and a dormant_error is a ParsingError, the expectation message is used instead in it. """ if self._dormant_error: if parsing_expectation_msg and isinstance(self._dormant_error, ParsingError): raise ParsingError(parsing_expectation_msg, token=self._token_stream.head) else: raise self._dormant_error return self._value @property def pending_tokens(self): """ Returns a TokenStream with the pending tokens yet to be processed. """ return self._token_stream def or_find(self, finder): """ If a dormant_error is present, try this new finder instead. If not, does nothing. """ if self._dormant_error: return Capturer(self._token_stream).find(finder) else: return self def or_end_of_file(self): """ Discards any errors if at end of the stream. """ if isinstance(self._dormant_error, TokenStream.EndOfStream): return Capturer(self.pending_tokens, value=self._value) else: return self def or_empty(self): """ Discards any previously-encountered dormant error. """ if self._dormant_error: return Capturer(self.pending_tokens, value=self._value) else: return self def and_find(self, finder): """ Accumulate new "somethings" to the stored value using the given finder. """ if self._dormant_error: return Capturer(self.pending_tokens, dormant_error=self._dormant_error) return Capturer(self.pending_tokens, self.value()).find(finder) def capture_from(token_stream): return Capturer(token_stream) PKebLAUU*poetry/toml/prettify/parser/tokenstream.pyclass TokenStream: """ An immutable subset of a token sequence """ class EndOfStream(Exception): pass Nothing = tuple() def __init__(self, _tokens, offset=0): if isinstance(_tokens, tuple): self._tokens = _tokens else: self._tokens = tuple(_tokens) self._head_index = offset def __len__(self): return len(self._tokens) - self.offset @property def head(self): try: return self._tokens[self._head_index] except IndexError: raise TokenStream.EndOfStream @property def tail(self): return TokenStream(self._tokens, offset=self._head_index+1) @property def offset(self): return self._head_index @property def at_end(self): return self.offset >= len(self._tokens) PK~bLamm'poetry/toml/prettify/tokens/__init__.py """ TOML lexical tokens. """ class TokenType: """ A TokenType is a concrete type of a source token along with a defined priority and a higher-order kind. The priority will be used in determining the tokenization behaviour of the lexer in the following manner: whenever more than one token is recognizable as the next possible token and they are all of equal source length, this priority is going to be used to break the tie by favoring the token type of the lowest priority value. A TokenType instance is naturally ordered by its priority. """ def __init__(self, name, priority, is_metadata): self._priority = priority self._name = name self._is_metadata = is_metadata @property def is_metadata(self): return self._is_metadata @property def priority(self): return self._priority def __repr__(self): return "{}-{}".format(self.priority, self._name) def __lt__(self, other): return isinstance(other, TokenType) and self._priority < other.priority # Possible types of tokens TYPE_BOOLEAN = TokenType('boolean', 0, is_metadata=False) TYPE_INTEGER = TokenType('integer', 0, is_metadata=False) TYPE_OP_COMMA = TokenType('comma', 0, is_metadata=True) TYPE_OP_SQUARE_LEFT_BRACKET = TokenType('square_left_bracket', 0, is_metadata=True) TYPE_OP_SQUARE_RIGHT_BRACKET = TokenType('square_right_bracket', 0, is_metadata=True) TYPE_OP_CURLY_LEFT_BRACKET = TokenType('curly_left_bracket', 0, is_metadata=True) TYPE_OP_CURLY_RIGHT_BRACKET = TokenType('curly_right_bracket', 0, is_metadata=True) TYPE_OP_ASSIGNMENT = TokenType('assignment', 0, is_metadata=True) TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET = TokenType('double_square_left_bracket', 0, is_metadata=True) TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET = TokenType('double_square_right_bracket', 0, is_metadata=True) TYPE_FLOAT = TokenType('float', 1, is_metadata=False) TYPE_DATE = TokenType('date', 40, is_metadata=False) TYPE_OPT_DOT = TokenType('dot', 40, is_metadata=True) TYPE_BARE_STRING = TokenType('bare_string', 50, is_metadata=False) TYPE_STRING = TokenType('string', 90, is_metadata=False) TYPE_MULTILINE_STRING = TokenType('multiline_string', 90, is_metadata=False) TYPE_LITERAL_STRING = TokenType('literal_string', 90, is_metadata=False) TYPE_MULTILINE_LITERAL_STRING = TokenType('multiline_literal_string', 90, is_metadata=False) TYPE_NEWLINE = TokenType('newline', 91, is_metadata=True) TYPE_WHITESPACE = TokenType('whitespace', 93, is_metadata=True) TYPE_COMMENT = TokenType('comment', 95, is_metadata=True) def is_operator(token): """ Returns True if the given token is an operator token. """ return token.type in ( TYPE_OP_COMMA, TYPE_OP_SQUARE_LEFT_BRACKET, TYPE_OP_SQUARE_RIGHT_BRACKET, TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET, TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET, TYPE_OP_CURLY_LEFT_BRACKET, TYPE_OP_CURLY_RIGHT_BRACKET, TYPE_OP_ASSIGNMENT, TYPE_OPT_DOT, ) def is_string(token): return token.type in ( TYPE_STRING, TYPE_MULTILINE_STRING, TYPE_LITERAL_STRING, TYPE_BARE_STRING, TYPE_MULTILINE_LITERAL_STRING ) class Token: """ A token/lexeme in a TOML source file. A Token instance is naturally ordered by its type. """ def __init__(self, _type, source_substring, col=None, row=None): self._source_substring = source_substring self._type = _type self._col = col self._row = row def __eq__(self, other): if not isinstance(other, Token): return False return self.source_substring == other.source_substring and self.type == other.type @property def col(self): """ Column number (1-indexed). """ return self._col @property def row(self): """ Row number (1-indexed). """ return self._row @property def type(self): """ One of of the TOKEN_TYPE_* constants. """ return self._type @property def source_substring(self): """ The substring of the initial source file containing this token. """ return self._source_substring def __lt__(self, other): return isinstance(other, Token) and self.type < other.type def __repr__(self): return "{}: {}".format(self.type, self.source_substring) PK2bLV%poetry/toml/prettify/tokens/errors.pyfrom ..errors import TOMLError class DeserializationError(TOMLError): pass class BadEscapeCharacter(TOMLError): pass class MalformedDateError(DeserializationError): pass PKecLv#&&poetry/toml/prettify/tokens/py2toml.py """ A converter of python values to TOML Token instances. """ import codecs import datetime import six import re from .. import tokens from ..errors import TOMLError from ..tokens import Token from ..util import chunkate_string class NotPrimitiveError(TOMLError): pass _operator_tokens_by_type = { tokens.TYPE_OP_SQUARE_LEFT_BRACKET: tokens.Token(tokens.TYPE_OP_SQUARE_LEFT_BRACKET, u'['), tokens.TYPE_OP_SQUARE_RIGHT_BRACKET: tokens.Token(tokens.TYPE_OP_SQUARE_RIGHT_BRACKET, u']'), tokens.TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET: tokens.Token(tokens.TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET, u'[['), tokens.TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET: tokens.Token(tokens.TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET, u']]'), tokens.TYPE_OP_COMMA: tokens.Token(tokens.TYPE_OP_COMMA, u','), tokens.TYPE_NEWLINE: tokens.Token(tokens.TYPE_NEWLINE, u'\n'), tokens.TYPE_OPT_DOT: tokens.Token(tokens.TYPE_OPT_DOT, u'.'), } def operator_token(token_type): return _operator_tokens_by_type[token_type] def create_primitive_token(value, multiline_strings_allowed=True): """ Creates and returns a single token for the given primitive atomic value. Raises NotPrimitiveError when the given value is not a primitive atomic value """ if value is None: return create_primitive_token('') elif isinstance(value, bool): return tokens.Token(tokens.TYPE_BOOLEAN, u'true' if value else u'false') elif isinstance(value, int): return tokens.Token(tokens.TYPE_INTEGER, u'{}'.format(value)) elif isinstance(value, float): return tokens.Token(tokens.TYPE_FLOAT, u'{}'.format(value)) elif isinstance(value, (datetime.datetime, datetime.date, datetime.time)): return tokens.Token(tokens.TYPE_DATE, strict_rfc3339.timestamp_to_rfc3339_utcoffset(ts)) elif isinstance(value, six.string_types): return create_string_token(value, multiline_strings_allowed=multiline_strings_allowed) raise NotPrimitiveError("{} of type {}".format(value, type(value))) _bare_string_regex = re.compile('^[a-zA-Z0-9_-]*$') def create_string_token(text, bare_string_allowed=False, multiline_strings_allowed=True): """ Creates and returns a single string token. Raises ValueError on non-string input. """ if not isinstance(text, six.string_types): raise ValueError('Given value must be a string') if text == '': return tokens.Token(tokens.TYPE_STRING, '""'.format(_escape_single_line_quoted_string(text))) elif bare_string_allowed and _bare_string_regex.match(text): return tokens.Token(tokens.TYPE_BARE_STRING, text) elif multiline_strings_allowed and (len(tuple(c for c in text if c == '\n')) >= 2 or len(text) > 80): # If containing two or more newlines or is longer than 80 characters we'll use the multiline string format return _create_multiline_string_token(text) else: return tokens.Token(tokens.TYPE_STRING, '"{}"'.format(_escape_single_line_quoted_string(text))) def _escape_single_line_quoted_string(text): if six.PY2: return text.encode('unicode-escape').encode('string-escape').replace('"', '\\"').replace("\\'", "'") else: return codecs.encode(text, 'unicode-escape').decode().replace('"', '\\"') def _create_multiline_string_token(text): escaped = text.replace(u'"""', u'\"\"\"') if len(escaped) > 50: return tokens.Token(tokens.TYPE_MULTILINE_STRING, u'"""\n{}\\\n"""'.format(_break_long_text(escaped))) else: return tokens.Token(tokens.TYPE_MULTILINE_STRING, u'"""{}"""'.format(escaped)) def _break_long_text(text, maximum_length=75): """ Breaks into lines of 75 character maximum length that are terminated by a backslash. """ def next_line(remaining_text): # Returns a line and the remaining text if '\n' in remaining_text and remaining_text.index('\n') < maximum_length: i = remaining_text.index('\n') return remaining_text[:i+1], remaining_text[i+2:] elif len(remaining_text) > maximum_length and ' ' in remaining_text: i = remaining_text[:maximum_length].rfind(' ') return remaining_text[:i+1] + '\\\n', remaining_text[i+2:] else: return remaining_text, '' remaining_text = text lines = [] while remaining_text: line, remaining_text = next_line(remaining_text) lines += [line] return ''.join(lines) def create_whitespace(source_substring): return Token(tokens.TYPE_WHITESPACE, source_substring) def create_multiline_string(text, maximum_line_length=120): def escape(t): return t.replace(u'"""', six.u(r'\"\"\"')) source_substring = u'"""\n{}"""'.format(u'\\\n'.join(chunkate_string(escape(text), maximum_line_length))) return Token(tokens.TYPE_MULTILINE_STRING, source_substring) PKbL] &poetry/toml/prettify/tokens/toml2py.pyimport codecs import functools import operator import re import string from . import TYPE_BOOLEAN, TYPE_INTEGER, TYPE_FLOAT, TYPE_DATE, \ TYPE_MULTILINE_STRING, TYPE_BARE_STRING, TYPE_MULTILINE_LITERAL_STRING, TYPE_LITERAL_STRING, \ TYPE_STRING from .errors import MalformedDateError from .errors import BadEscapeCharacter def deserialize(token): """ Deserializes the value of a single tokens.Token instance based on its type. Raises DeserializationError when appropriate. """ if token.type == TYPE_BOOLEAN: return _to_boolean(token) elif token.type == TYPE_INTEGER: return _to_int(token) elif token.type == TYPE_FLOAT: return _to_float(token) elif token.type == TYPE_DATE: return _to_date(token) elif token.type in (TYPE_STRING, TYPE_MULTILINE_STRING, TYPE_BARE_STRING, TYPE_LITERAL_STRING, TYPE_MULTILINE_LITERAL_STRING): return _to_string(token) else: raise Exception('This should never happen!') def _unescape_str(text): """ Unescapes a string according the TOML spec. Raises BadEscapeCharacter when appropriate. """ # Detect bad escape jobs bad_escape_regexp = re.compile(r'([^\\]|^)\\[^btnfr"\\uU]') if bad_escape_regexp.findall(text): raise BadEscapeCharacter # Do the unescaping return codecs.decode(_unicode_escaped_string(text), 'unicode-escape') def _unicode_escaped_string(text): """ Escapes all unicode characters in the given string """ def is_unicode(c): return c.lower() not in string.ascii_letters + string.whitespace + string.punctuation + string.digits def escape_unicode_char(x): return codecs.encode(x, 'unicode-escape') if any(is_unicode(c) for c in text): homogeneous_chars = tuple(escape_unicode_char(c) if is_unicode(c) else c.encode() for c in text) homogeneous_bytes = functools.reduce(operator.add, homogeneous_chars) return homogeneous_bytes.decode() else: return text def _to_string(token): if token.type == TYPE_BARE_STRING: return token.source_substring elif token.type == TYPE_STRING: escaped = token.source_substring[1:-1] return _unescape_str(escaped) elif token.type == TYPE_MULTILINE_STRING: escaped = token.source_substring[3:-3] # Drop the first newline if existed if escaped and escaped[0] == '\n': escaped = escaped[1:] # Remove all occurrences of a slash-newline-zero-or-more-whitespace patterns escaped = re.sub(r'\\\n\s*', repl='', string=escaped, flags=re.DOTALL) return _unescape_str(escaped) elif token.type == TYPE_LITERAL_STRING: return token.source_substring[1:-1] elif token.type == TYPE_MULTILINE_LITERAL_STRING: text = token.source_substring[3:-3] if text[0] == '\n': text = text[1:] return text raise RuntimeError('Control should never reach here.') def _to_int(token): return int(token.source_substring.replace('_', '')) def _to_float(token): assert token.type == TYPE_FLOAT string = token.source_substring.replace('_', '') return float(string) def _to_boolean(token): return token.source_substring == 'true' _correct_date_format = re.compile( r'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})(Z|([+-])(\d{2}):(\d{2}))' ) def _to_date(token): if not _correct_date_format.match(token.source_substring): raise MalformedDateError return token.source_substring PK7bLyۚpoetry/toml/prettify/util.pyimport itertools def is_sequence_like(x): """ Returns True if x exposes a sequence-like interface. """ required_attrs = ( '__len__', '__getitem__' ) return all(hasattr(x, attr) for attr in required_attrs) def is_dict_like(x): """ Returns True if x exposes a dict-like interface. """ required_attrs = ( '__len__', '__getitem__', 'keys', 'values', ) return all(hasattr(x, attr) for attr in required_attrs) def join_with(iterable, separator): """ Joins elements from iterable with separator and returns the produced sequence as a list. separator must be addable to a list. """ inputs = list(iterable) b = [] for i, element in enumerate(inputs): if isinstance(element, (list, tuple, set)): b += tuple(element) else: b += [element] if i < len(inputs)-1: b += separator return b def chunkate_string(text, length): """ Iterates over the given seq in chunks of at maximally the given length. Will never break a whole word. """ iterator_index = 0 def next_newline(): try: return next(i for (i, c) in enumerate(text) if i > iterator_index and c == '\n') except StopIteration: return len(text) def next_breaker(): try: return next(i for (i, c) in reversed(tuple(enumerate(text))) if i >= iterator_index and (i < iterator_index+length) and c in (' ', '\t')) except StopIteration: return len(text) while iterator_index < len(text): next_chunk = text[iterator_index:min(next_newline(), next_breaker()+1)] iterator_index += len(next_chunk) yield next_chunk def flatten_nested(nested_dicts): """ Flattens dicts and sequences into one dict with tuples of keys representing the nested keys. Example >>> dd = { \ 'dict1': {'name': 'Jon', 'id': 42}, \ 'dict2': {'name': 'Sam', 'id': 41}, \ 'seq1': [{'one': 1, 'two': 2}] \ } >>> flatten_nested(dd) == { \ ('dict1', 'name'): 'Jon', ('dict1', 'id'): 42, \ ('dict2', 'name'): 'Sam', ('dict2', 'id'): 41, \ ('seq1', 0, 'one'): 1, ('seq1', 0, 'two'): 2, \ } True """ assert isinstance(nested_dicts, (dict, list, tuple)), 'Only works with a collection parameter' def items(c): if isinstance(c, dict): return c.items() elif isinstance(c, (list, tuple)): return enumerate(c) else: raise RuntimeError('c must be a collection') def flatten(dd): output = {} for k, v in items(dd): if isinstance(v, (dict, list, tuple)): for child_key, child_value in flatten(v).items(): output[(k,) + child_key] = child_value else: output[(k,)] = v return output return flatten(nested_dicts) class PeekableIterator: # Returned by peek() when the iterator is exhausted. Truthiness is False. Nothing = tuple() def __init__(self, iter): self._iter = iter def __next__(self): return next(self._iter) def next(self): return self.__next__() def __iter__(self): return self def peek(self): """ Returns PeekableIterator.Nothing when the iterator is exhausted. """ try: v = next(self._iter) self._iter = itertools.chain((v,), self._iter) return v except StopIteration: return PeekableIterator.Nothing PKbL)poetry/toml/raw.pyfrom .prettify.elements.abstracttable import AbstractTable def to_raw(x): from .cascadedict import CascadeDict if isinstance(x, AbstractTable): return x.primitive_value elif isinstance(x, CascadeDict): return x.neutralized elif isinstance(x, (list, tuple)): return [to_raw(y) for y in x] elif isinstance(x, dict): return {k: to_raw(v) for (k, v) in x.items()} else: return x PKڣdLNNpoetry/toml/structurer.pyfrom . import toplevels from .cascadedict import CascadeDict class NamedDict(dict): """ A dict that can use Name instances as keys. """ def __init__(self, other_dict=None): dict.__init__(self) if other_dict: for k, v in other_dict.items(): self[k] = v def __setitem__(self, key, value): """ key can be an Name instance. When key is a path in the form of an Name instance, all the parents and grandparents of the value are created along the way as instances of NamedDict. If the parent of the value exists, it is replaced with a CascadeDict() that cascades the old parent value with a new NamedDict that contains the given child name and value. """ if isinstance(key, toplevels.Name): obj = self for i, name in enumerate(key.sub_names): if name in obj: if i == len(key.sub_names) - 1: obj[name] = CascadeDict(obj[name], value) else: obj[name] = CascadeDict(NamedDict(), obj[name]) else: if i == len(key.sub_names) - 1: obj[name] = value else: obj[name] = NamedDict() obj = obj[name] else: return dict.__setitem__(self, key, value) def __contains__(self, item): try: _ = self[item] return True except KeyError: return False def append(self, key, value): """ Makes sure the value pointed to by key exists and is a list and appends the given value to it. """ if key in self: self[key].append(value) else: self[key] = [value] def __getitem__(self, item): if isinstance(item, toplevels.Name): d = self for name in item.sub_names: d = d[name] return d else: return dict.__getitem__(self, item) def __eq__(self, other): return dict.__eq__(self, other) def structure(table_toplevels): """ Accepts an ordered sequence of TopLevel instances and returns a navigable object structure representation of the TOML file. """ table_toplevels = tuple(table_toplevels) obj = NamedDict() last_array_of_tables = None # The Name of the last array-of-tables header for toplevel in table_toplevels: if isinstance(toplevel, toplevels.AnonymousTable): obj[''] = toplevel.table_element elif isinstance(toplevel, toplevels.Table): if last_array_of_tables and toplevel.name.is_prefixed_with(last_array_of_tables): seq = obj[last_array_of_tables] unprefixed_name = toplevel.name.without_prefix(last_array_of_tables) seq[-1] = CascadeDict(seq[-1], NamedDict({unprefixed_name: toplevel.table_element})) else: obj[toplevel.name] = toplevel.table_element else: # It's an ArrayOfTables if last_array_of_tables and toplevel.name != last_array_of_tables and \ toplevel.name.is_prefixed_with(last_array_of_tables): seq = obj[last_array_of_tables] unprefixed_name = toplevel.name.without_prefix(last_array_of_tables) if unprefixed_name in seq[-1]: seq[-1][unprefixed_name].append(toplevel.table_element) else: cascaded_with = NamedDict({unprefixed_name: [toplevel.table_element]}) seq[-1] = CascadeDict(seq[-1], cascaded_with) else: obj.append(toplevel.name, toplevel.table_element) last_array_of_tables = toplevel.name return obj PK,dL{%%poetry/toml/toml_file.pyfrom .prettify.errors import NoArrayFoundError from . import structurer, toplevels, raw from .array import ArrayOfTables from .freshtable import FreshTable from .prettify.elements import factory as element_factory from .prettify import util class TOMLFile(dict): """ A TOMLFile object that tries its best to prserve formatting and order of mappings of the input source. Raises InvalidTOMLFileError on invalid input elements. Raises DuplicateKeysError, DuplicateTableError when appropriate. """ def __init__(self, _elements): self._elements = [] self._navigable = {} self.append_elements(_elements) def __getitem__(self, item): try: value = self._navigable[item] if isinstance(value, (list, tuple)): return ArrayOfTables(toml_file=self, name=item, iterable=value) else: return value except KeyError: return FreshTable(parent=self, name=item, is_array=False) def __contains__(self, item): return item in self.keys() def _setitem_with_key_seq(self, key_seq, value): """ Sets a the value in the TOML file located by the given key sequence. Example: self._setitem(('key1', 'key2', 'key3'), 'text_value') is equivalent to doing self['key1']['key2']['key3'] = 'text_value' """ table = self key_so_far = tuple() for key in key_seq[:-1]: key_so_far += (key,) self._make_sure_table_exists(key_so_far) table = table[key] table[key_seq[-1]] = value def _array_setitem_with_key_seq(self, array_name, index, key_seq, value): """ Sets a the array value in the TOML file located by the given key sequence. Example: self._array_setitem(array_name, index, ('key1', 'key2', 'key3'), 'text_value') is equivalent to doing self.array(array_name)[index]['key1']['key2']['key3'] = 'text_value' """ table = self.array(array_name)[index] key_so_far = tuple() for key in key_seq[:-1]: key_so_far += (key,) new_table = self._array_make_sure_table_exists(array_name, index, key_so_far) if new_table is not None: table = new_table else: table = table[key] table[key_seq[-1]] = value def _make_sure_table_exists(self, name_seq): """ Makes sure the table with the full name comprising of name_seq exists. """ t = self for key in name_seq[:-1]: t = t[key] name = name_seq[-1] if name not in t: self.append_elements([element_factory.create_table_header_element(name_seq), element_factory.create_table({})]) def _array_make_sure_table_exists(self, array_name, index, name_seq): """ Makes sure the table with the full name comprising of name_seq exists. """ t = self[array_name][index] for key in name_seq[:-1]: t = t[key] name = name_seq[-1] if name not in t: new_table = element_factory.create_table({}) self.append_elements([element_factory.create_table_header_element((array_name,) + name_seq), new_table]) return new_table def __delitem__(self, key): table_element_index = self._elements.index(self._navigable[key]) self._elements[table_element_index] = element_factory.create_table({}) self._on_element_change() def __setitem__(self, key, value): # Setting an array-of-tables if key and isinstance(value, (tuple, list)) and value and all(isinstance(v, dict) for v in value): for table in value: self.array(key).append(table) # Or setting a whole single table elif isinstance(value, dict): if key and key in self: del self[key] for key_seq, child_value in util.flatten_nested({key: value}).items(): self._setitem_with_key_seq(key_seq, child_value) # if key in self._navigable: # del self[key] # index = self._elements.index(self._navigable[key]) # self._elements = self._elements[:index] + [element_factory.create_table(value)] + self._elements[index+1:] # else: # if key: # self._elements.append(element_factory.create_table_header_element(key)) # self._elements.append(element_factory.create_table(value)) # Or updating the anonymous section table else: # It's mea self[''][key] = value self._on_element_change() def _detect_toplevels(self): """ Returns a sequence of TopLevel instances for the current state of this table. """ return tuple(e for e in toplevels.identify(self.elements) if isinstance(e, toplevels.Table)) def _update_table_fallbacks(self, table_toplevels): """ Updates the fallbacks on all the table elements to make relative table access possible. Raises DuplicateKeysError if appropriate. """ if len(self.elements) <= 1: return def parent_of(toplevel): # Returns an TopLevel parent of the given entry, or None. for parent_toplevel in table_toplevels: if toplevel.name.sub_names[:-1] == parent_toplevel.name.sub_names: return parent_toplevel for entry in table_toplevels: if entry.name.is_qualified: parent = parent_of(entry) if parent: child_name = entry.name.without_prefix(parent.name) parent.table_element.set_fallback({child_name.sub_names[0]: entry.table_element}) def _recreate_navigable(self): if self._elements: self._navigable = structurer.structure(toplevels.identify(self._elements)) def array(self, name): """ Returns the array of tables with the given name. """ if name in self._navigable: if isinstance(self._navigable[name], (list, tuple)): return self[name] else: raise NoArrayFoundError else: return ArrayOfTables(toml_file=self, name=name) def _on_element_change(self): self._recreate_navigable() table_toplevels = self._detect_toplevels() self._update_table_fallbacks(table_toplevels) def append_elements(self, elements): """ Appends more elements to the contained internal elements. """ self._elements = self._elements + list(elements) self._on_element_change() def prepend_elements(self, elements): """ Prepends more elements to the contained internal elements. """ self._elements = list(elements) + self._elements self._on_element_change() def dumps(self): """ Returns the TOML file serialized back to str. """ return ''.join(element.serialized() for element in self._elements) def dump(self, file_path): with open(file_path, mode='w') as fp: fp.write(self.dumps()) def keys(self): return set(self._navigable.keys()) | {''} def values(self): return self._navigable.values() def items(self): items = list(self._navigable.items()) def has_anonymous_entry(): return any(key == '' for (key, _) in items) if has_anonymous_entry(): return items else: return items + [('', self[''])] def get(self, item, default=None): return self._navigable.get(item, default) @property def primitive(self): """ Returns a primitive object representation for this container (which is a dict). WARNING: The returned container does not contain any markup or formatting metadata. """ raw_container = raw.to_raw(self._navigable) # Collapsing the anonymous table onto the top-level container is present if '' in raw_container: raw_container.update(raw_container['']) del raw_container[''] return raw_container def append_fresh_table(self, fresh_table): """ Gets called by FreshTable instances when they get written to. """ if fresh_table.name: elements = [] if fresh_table.is_array: elements += [element_factory.create_array_of_tables_header_element(fresh_table.name)] else: elements += [element_factory.create_table_header_element(fresh_table.name)] elements += [fresh_table, element_factory.create_newline_element()] self.append_elements(elements) else: # It's an anonymous table self.prepend_elements([fresh_table, element_factory.create_newline_element()]) @property def elements(self): return self._elements def __str__(self): is_empty = (not self['']) and (not tuple(k for k in self.keys() if k)) def key_name(key): return '[ANONYMOUS]' if not key else key def pair(key, value): return '%s = %s' % (key_name(key), str(value)) content_text = '' if is_empty else \ '\n\t' + ',\n\t'.join(pair(k, v) for (k, v) in self.items() if v) + '\n' return "TOMLFile{%s}" % content_text def __repr__(self): return str(self) PKbLp 1 def __str__(self): return '.'.join(self.sub_names) def __hash__(self): return hash(str(self)) def __eq__(self, other): return str(self) == str(other) def __ne__(self, other): return not self.__eq__(other) class AnonymousTable(TopLevel): def __init__(self, table_element): TopLevel.__init__(self, ('',), table_element) class Table(TopLevel): def __init__(self, names, table_element): TopLevel.__init__(self, names=names, table_element=table_element) class ArrayOfTables(TopLevel): def __init__(self, names, table_element): TopLevel.__init__(self, names=names, table_element=table_element) def _validate_file_elements(file_elements): pass def identify(file_elements): """ Outputs an ordered sequence of instances of TopLevel types. Elements start with an optional TableElement, followed by zero or more pairs of (TableHeaderElement, TableElement). """ if not file_elements: return _validate_file_elements(file_elements) # An iterator over enumerate(the non-metadata) elements iterator = PeekableIterator((element_i, element) for (element_i, element) in enumerate(file_elements) if element.type != elements.TYPE_METADATA) try: _, first_element = iterator.peek() if isinstance(first_element, TableElement): iterator.next() yield AnonymousTable(first_element) except KeyError: pass except StopIteration: return for element_i, element in iterator: if not isinstance(element, TableHeaderElement): continue # If TableHeader of a regular table, return Table following it if not element.is_array_of_tables: table_element_i, table_element = next(iterator) yield Table(names=element.names, table_element=table_element) # If TableHeader of an array of tables, do your thing else: table_element_i, table_element = next(iterator) yield ArrayOfTables(names=element.names, table_element=table_element) PK,wZLpoetry/utils/__init__.pyPK2QfL\ipoetry/utils/helpers.pyimport re _canonicalize_regex = re.compile('[-_.]+') def canonicalize_name(name: str) -> str: return _canonicalize_regex.sub('-', name).lower() def module_name(name: str) -> str: return canonicalize_name(name).replace('-', '_') PK2QfLpoetry/utils/toml_file.pyimport toml from pathlib import Path from poetry.toml import dumps from poetry.toml import loads from poetry.toml import TOMLFile class TomlFile: def __init__(self, path): self._path = Path(path) @property def path(self): return self._path def read(self, raw=False) -> dict: if raw: return toml.loads(self._path.read_text()) return loads(self._path.read_text()) def write(self, data) -> None: if not isinstance(data, TOMLFile): data = toml.dumps(data) else: data = dumps(data) self._path.write_text(data) def __getattr__(self, item): return getattr(self._path, item) PK][L|q poetry/utils/venv.pyimport glob import os import subprocess import sys class Venv: def __init__(self, venv=None): self._venv = venv @classmethod def create(cls) -> 'Venv': if 'VIRTUAL_ENV' not in os.environ: # Not in a virtualenv return cls() # venv detection: # stdlib venv may symlink sys.executable, so we can't use realpath. # but others can symlink *to* the venv Python, # so we can't just use sys.executable. # So we just check every item in the symlink tree (generally <= 3) p = os.path.normcase(sys.executable) paths = [p] while os.path.islink(p): p = os.path.normcase( os.path.join(os.path.dirname(p), os.readlink(p))) paths.append(p) p_venv = os.path.normcase(os.environ['VIRTUAL_ENV']) if any(p.startswith(p_venv) for p in paths): # Running properly in the virtualenv, don't need to do anything return cls() if sys.platform == "win32": venv = os.path.join( os.environ['VIRTUAL_ENV'], 'Lib', 'site-packages' ) else: lib = os.path.join( os.environ['VIRTUAL_ENV'], 'lib' ) python = glob.glob( os.path.join(lib, 'python*') )[0].replace( lib + '/', '' ) venv = os.path.join( lib, python, 'site-packages' ) return cls(venv) @property def venv(self): return self._venv @property def python(self) -> str: """ Path to current python executable """ return self._bin('python') @property def pip(self) -> str: """ Path to current pip executable """ return self._bin('pip') def run(self, bin: str, *args) -> str: """ Run a command inside the virtual env. """ cmd = [self._bin(bin)] + list(args) output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) return output.decode() def _bin(self, bin) -> str: """ Return path to the given executable. """ if not self.is_venv(): return bin return os.path.realpath( os.path.join(self._venv, '..', '..', '..', 'bin', bin) ) def is_venv(self) -> bool: return self._venv is not None PK2QfLtpoetry/vcs/__init__.pyfrom pathlib import Path from .git import Git def get_vcs(directory: Path): directory = directory.resolve() for p in [directory] + list(directory.parents): if (p / '.git').is_dir(): return Git() PK2QfLL/Tpoetry/vcs/git.pyimport re import subprocess class GitConfig: def __init__(self): config_list = subprocess.check_output( ['git', 'config', '-l'], stderr=subprocess.STDOUT ).decode() self._config = {} m = re.findall('(?ms)^([^=]+)=(.*?)$', config_list) if m: for group in m: self._config[group[0]] = group[1] def get(self, key, default=None): return self._config.get(key, default) def __getitem__(self, item): return self._config[item] class Git: def __init__(self): self._config = GitConfig() @property def config(self) -> GitConfig: return self._config def clone(self, repository, dest) -> str: return self.run('clone', repository, dest) def checkout(self, rev, folder) -> str: return self.run( '--git-dir', (folder / '.git').as_posix(), '--work-tree', folder.as_posix(), 'checkout', rev ) def rev_parse(self, rev, folder) -> str: return self.run( '--git-dir', (folder / '.git').as_posix(), '--work-tree', folder.as_posix(), 'rev-parse', rev ) def get_ignored_files(self) -> list: output = self.run( 'ls-files', '--others', '-i', '--exclude-standard' ) return output.split('\n') def run(self, *args) -> str: return subprocess.check_output( ['git'] + list(args), stderr=subprocess.STDOUT ).decode() PK|gL~G8poetry/version/__init__.pyimport operator from typing import Union from .exceptions import InvalidVersion from .legacy_version import LegacyVersion from .version import Version OP_EQ = operator.eq OP_LT = operator.lt OP_LE = operator.le OP_GT = operator.gt OP_GE = operator.ge OP_NE = operator.ne _trans_op = { '=': OP_EQ, '==': OP_EQ, '<': OP_LT, '<=': OP_LE, '>': OP_GT, '>=': OP_GE, '!=': OP_NE } def parse(version: str, strict: bool = False) -> Union[Version, LegacyVersion]: """ Parse the given version string and return either a :class:`Version` object or a LegacyVersion object depending on if the given version is a valid PEP 440 version or a legacy version. If strict=True only PEP 440 versions will be accepted. """ try: return Version(version) except InvalidVersion: if strict: raise return LegacyVersion(version) def version_compare(version1: str, version2: str, operator) -> bool: if operator in _trans_op: operator = _trans_op[operator] elif operator in _trans_op.values(): pass else: raise ValueError('Invalid operator') version1 = parse(version1) version2 = parse(version2) return operator(version1, version2) PK|gL0Dpoetry/version/base.pyclass BaseVersion: def __hash__(self): return hash(self._key) def __lt__(self, other): return self._compare(other, lambda s, o: s < o) def __le__(self, other): return self._compare(other, lambda s, o: s <= o) def __eq__(self, other): return self._compare(other, lambda s, o: s == o) def __ge__(self, other): return self._compare(other, lambda s, o: s >= o) def __gt__(self, other): return self._compare(other, lambda s, o: s > o) def __ne__(self, other): return self._compare(other, lambda s, o: s != o) def _compare(self, other, method): if not isinstance(other, BaseVersion): return NotImplemented return method(self._key, other._key) PK{gLXE%,,poetry/version/exceptions.pyclass InvalidVersion(ValueError): pass PKM~gL poetry/version/legacy_version.pyimport re from .base import BaseVersion class LegacyVersion(BaseVersion): def __init__(self, version): self._version = str(version) self._key = _legacy_cmpkey(self._version) def __str__(self): return self._version def __repr__(self): return "".format(repr(str(self))) @property def public(self): return self._version @property def base_version(self): return self._version @property def local(self): return None @property def is_prerelease(self): return False @property def is_postrelease(self): return False _legacy_version_component_re = re.compile( r"(\d+ | [a-z]+ | \.| -)", re.VERBOSE, ) _legacy_version_replacement_map = { "pre": "c", "preview": "c", "-": "final-", "rc": "c", "dev": "@", } def _parse_version_parts(s): for part in _legacy_version_component_re.split(s): part = _legacy_version_replacement_map.get(part, part) if not part or part == ".": continue if part[:1] in "0123456789": # pad for numeric comparison yield part.zfill(8) else: yield "*" + part # ensure that alpha/beta/candidate are before final yield "*final" def _legacy_cmpkey(version): # We hardcode an epoch of -1 here. A PEP 440 version can only have a epoch # greater than or equal to 0. This will effectively put the LegacyVersion, # which uses the defacto standard originally implemented by setuptools, # as before all PEP 440 versions. epoch = -1 # This scheme is taken from pkg_resources.parse_version setuptools prior to # it's adoption of the packaging library. parts = [] for part in _parse_version_parts(version.lower()): if part.startswith("*"): # remove "-" before a prerelease tag if part < "*final": while parts and parts[-1] == "*final-": parts.pop() # remove trailing zeros from each series of numeric parts while parts and parts[-1] == "00000000": parts.pop() parts.append(part) parts = tuple(parts) return epoch, parts PKw}gL poetry/version/utils.pyclass Infinity(object): def __repr__(self): return "Infinity" def __hash__(self): return hash(repr(self)) def __lt__(self, other): return False def __le__(self, other): return False def __eq__(self, other): return isinstance(other, self.__class__) def __ne__(self, other): return not isinstance(other, self.__class__) def __gt__(self, other): return True def __ge__(self, other): return True def __neg__(self): return NegativeInfinity Infinity = Infinity() class NegativeInfinity(object): def __repr__(self): return "-Infinity" def __hash__(self): return hash(repr(self)) def __lt__(self, other): return True def __le__(self, other): return True def __eq__(self, other): return isinstance(other, self.__class__) def __ne__(self, other): return not isinstance(other, self.__class__) def __gt__(self, other): return False def __ge__(self, other): return False def __neg__(self): return Infinity NegativeInfinity = NegativeInfinity() PK{}gL-Z*poetry/version/version.pyimport re from collections import namedtuple from itertools import dropwhile from .base import BaseVersion from .exceptions import InvalidVersion from .utils import Infinity _Version = namedtuple( "_Version", ["epoch", "release", "dev", "pre", "post", "local"], ) VERSION_PATTERN = re.compile(""" ^ v? (?: (?:(?P[0-9]+)!)? # epoch (?P[0-9]+(?:\.[0-9]+)*) # release segment (?P
                                          # pre-release
            [-_.]?
            (?P(a|b|c|rc|alpha|beta|pre|preview))
            [-_.]?
            (?P[0-9]+)?
        )?
        (?P                                         # post release
            (?:-(?P[0-9]+))
            |
            (?:
                [-_.]?
                (?Ppost|rev|r)
                [-_.]?
                (?P[0-9]+)?
            )
        )?
        (?P                                          # dev release
            [-_.]?
            (?Pdev)
            [-_.]?
            (?P[0-9]+)?
        )?
    )
    (?:\+(?P[a-z0-9]+(?:[-_.][a-z0-9]+)*))?       # local version
    $
""", re.IGNORECASE | re.VERBOSE)


class Version(BaseVersion):

    def __init__(self, version):
        # Validate the version and parse it into pieces
        match = VERSION_PATTERN.match(version)
        if not match:
            raise InvalidVersion("Invalid version: '{0}'".format(version))

        # Store the parsed out pieces of the version
        self._version = _Version(
            epoch=int(match.group("epoch")) if match.group("epoch") else 0,
            release=tuple(int(i) for i in match.group("release").split(".")),
            pre=_parse_letter_version(
                match.group("pre_l"),
                match.group("pre_n"),
            ),
            post=_parse_letter_version(
                match.group("post_l"),
                match.group("post_n1") or match.group("post_n2"),
            ),
            dev=_parse_letter_version(
                match.group("dev_l"),
                match.group("dev_n"),
            ),
            local=_parse_local_version(match.group("local")),
        )

        # Generate a key which will be used for sorting
        self._key = _cmpkey(
            self._version.epoch,
            self._version.release,
            self._version.pre,
            self._version.post,
            self._version.dev,
            self._version.local,
        )

    def __repr__(self):
        return "".format(repr(str(self)))

    def __str__(self):
        parts = []

        # Epoch
        if self._version.epoch != 0:
            parts.append("{0}!".format(self._version.epoch))

        # Release segment
        parts.append(".".join(str(x) for x in self._version.release))

        # Pre-release
        if self._version.pre is not None:
            parts.append("".join(str(x) for x in self._version.pre))

        # Post-release
        if self._version.post is not None:
            parts.append(".post{0}".format(self._version.post[1]))

        # Development release
        if self._version.dev is not None:
            parts.append(".dev{0}".format(self._version.dev[1]))

        # Local version segment
        if self._version.local is not None:
            parts.append(
                "+{0}".format(".".join(str(x) for x in self._version.local))
            )

        return "".join(parts)

    @property
    def public(self):
        return str(self).split("+", 1)[0]

    @property
    def base_version(self):
        parts = []

        # Epoch
        if self._version.epoch != 0:
            parts.append("{0}!".format(self._version.epoch))

        # Release segment
        parts.append(".".join(str(x) for x in self._version.release))

        return "".join(parts)

    @property
    def local(self):
        version_string = str(self)
        if "+" in version_string:
            return version_string.split("+", 1)[1]

    @property
    def is_prerelease(self):
        return bool(self._version.dev or self._version.pre)

    @property
    def is_postrelease(self):
        return bool(self._version.post)


def _parse_letter_version(letter, number):
    if letter:
        # We consider there to be an implicit 0 in a pre-release if there is
        # not a numeral associated with it.
        if number is None:
            number = 0

        # We normalize any letters to their lower case form
        letter = letter.lower()

        # We consider some words to be alternate spellings of other words and
        # in those cases we want to normalize the spellings to our preferred
        # spelling.
        if letter == "alpha":
            letter = "a"
        elif letter == "beta":
            letter = "b"
        elif letter in ["c", "pre", "preview"]:
            letter = "rc"
        elif letter in ["rev", "r"]:
            letter = "post"

        return letter, int(number)
    if not letter and number:
        # We assume if we are given a number, but we are not given a letter
        # then this is using the implicit post release syntax (e.g. 1.0-1)
        letter = "post"

        return letter, int(number)


_local_version_seperators = re.compile(r"[._-]")


def _parse_local_version(local):
    """
    Takes a string like abc.1.twelve and turns it into ("abc", 1, "twelve").
    """
    if local is not None:
        return tuple(
            part.lower() if not part.isdigit() else int(part)
            for part in _local_version_seperators.split(local)
        )


def _cmpkey(epoch, release, pre, post, dev, local):
    # When we compare a release version, we want to compare it with all of the
    # trailing zeros removed. So we'll use a reverse the list, drop all the now
    # leading zeros until we come to something non zero, then take the rest
    # re-reverse it back into the correct order and make it a tuple and use
    # that for our sorting key.
    release = tuple(
        reversed(list(
            dropwhile(
                lambda x: x == 0,
                reversed(release),
            )
        ))
    )

    # We need to "trick" the sorting algorithm to put 1.0.dev0 before 1.0a0.
    # We'll do this by abusing the pre segment, but we _only_ want to do this
    # if there is not a pre or a post segment. If we have one of those then
    # the normal sorting rules will handle this case correctly.
    if pre is None and post is None and dev is not None:
        pre = -Infinity

    # Versions without a pre-release (except as noted above) should sort after
    # those with one.
    elif pre is None:
        pre = Infinity

    # Versions without a post segment should sort before those with one.
    if post is None:
        post = -Infinity

    # Versions without a development segment should sort after those with one.
    if dev is None:
        dev = Infinity

    if local is None:
        # Versions without a local segment should sort before those with one.
        local = -Infinity
    else:
        # Versions with a local segment need that segment parsed to implement
        # the sorting rules in PEP440.
        # - Alpha numeric segments sort before numeric segments
        # - Alpha numeric segments sort lexicographically
        # - Numeric segments sort numerically
        # - Shorter versions sort before longer versions when the prefixes
        #   match exactly
        local = tuple(
            (i, "") if isinstance(i, int) else (-Infinity, i)
            for i in local
        )

    return epoch, release, pre, post, dev, local
PKZbL'"poetry/version/version_selector.pyimport re
from typing import Union

from poetry.packages import Package
from poetry.semver.comparison import less_than
from poetry.semver.helpers import normalize_version
from poetry.semver.version_parser import VersionParser


class VersionSelector(object):

    def __init__(self, pool, parser=VersionParser()):
        self._pool = pool
        self._parser = parser

    def find_best_candidate(self,
                            package_name: str,
                            target_package_version: Union[str, None] = None
                            ) -> Union[Package, bool]:
        """
        Given a package name and optional version,
        returns the latest Package that matches
        """
        if target_package_version:
            constraint = self._parser.parse_constraints(target_package_version)
        else:
            constraint = None

        candidates = self._pool.find_packages(package_name, constraint)

        if not candidates:
            return False

        # Select highest version if we have many
        package = candidates[0]
        for candidate in candidates:
            # Select highest version of the two
            if less_than(package.version, candidate.version):
                package = candidate

        return package

    def find_recommended_require_version(self, package):
        version = package.version

        return self._transform_version(version, package.pretty_version)

    def _transform_version(self, version, pretty_version):
        # attempt to transform 2.1.1 to 2.1
        # this allows you to upgrade through minor versions
        try:
            parts = normalize_version(version).split('.')
        except ValueError:
            return pretty_version

        # check to see if we have a semver-looking version
        if len(parts) == 4 and re.match('^0\D?', parts[3]):
            # remove the last parts (the patch version number and any extra)
            if parts[0] == '0':
                del parts[3]
            else:
                del parts[3]
                del parts[2]

            version = '.'.join(parts)
        else:
            return pretty_version

        return f'^{version}'
PK!H$,-poetry-0.4.0.post1.dist-info/entry_points.txtN+I/N.,()*O-)PVPi<.PK!H#SS"poetry-0.4.0.post1.dist-info/WHEELHM
K-*ϳR03rOK-J,/R(O-)T03zd&Y)r$U&UrPK!H$I,T%poetry-0.4.0.post1.dist-info/METADATA\Ƒb*nHl9<IU'*% 8$,:+*/pqo'_w\WlR2	GOOO?/MO*ɔ6->7Ҍ0UQ͛~t
[zHT"S2F/<+SxsmbWh6ҋZɉVqb8Q/m^>lE9vA;9H_Lb[&dDjhq53/uZ;Uˋ,t֎'Ylm:K
fyU2^.iu/|^cEz0Wx4H')SOi>m7,RKi}!ﴮe&{2F-WjXE$Egw&[Y)j$K3pOJyg)5V3UY`4E]Hjat͘\ksczaJ<L#XdsoaSG$:ϺMÄP'H6ܮL+^DzMO!j!*Y&#ut:cňt<uYԖL'E5i	)jU$ź=LkjT-i34-&e:1SM<q[Ve餄5)yT21l^ێcɃUƲmpRW`G&]7-r׋4cT
,ɠ1[ĠYW9\߳:)EAb ,!N^AVi8R:IXIin&}	ro_FӓmDph}oN>2?R+=BPfwnf)O	t`myBNʏC~#~yr7EeFǬFPPnղ祶YN *͈"BSІ,4`li<&Lc51i5w_~K::UYLp'Sc-t]QDoUlW}ʯ*lk\"7PcLOr~`㑜 q5F{zP[MɥP
_\SZ>&6/7;h	nF=QĖ&1
^:N1Py瘸<}8ܥ4!:cv`h
G6tOy ӸyeY/
f+0.Syswgdi'aNrp9rpCpxN!5o$nJA!&)M5Sp
OdJ<3mf^Y+i&lo޳	3a;:`4~ۣntIΗ)m
뛴@oBUvZ&I]&›
&q|vyVL`;vؠ+b
0pj(̸JIWO6zrIbjOvtH@wͿ(A6Z@?-{|ܫi` XԼ!- $5zMxUh%@3BL
рN9&rg7ȯKuBޮa8$ВTs6†WB)V<6lj<ܢ͝\tE3b*ssC?:bX?ZCɡ^hoԿ	&Lf		!i$[htwpX	d.)W_:q=zz$N+V,dlv6s|cݞ
(X\nb,`0I3nIh`%ҎƱo86 O^<'jn?,,AE(\WGUzGko.c5FD?є}ZBUZd%i
9	,m67VS_
|ƛǸ_1Ǜ(Fa"0<`R)DDeg
HF|状"HG[v2$k5)sy=8=)O%d:憽0i8j¾eKt+aYI롏5~Їg_FG}
r8350b_+TRz0ٗ9#>6Jmg
0#nPS㱓M2uAST70'8I.MSz|F$Y=SA7NkRQ༅_sh9$AVj1z51+k3m"FǺ@	f16a((`%*tJ?PnESXnX+dE>73 `IٜyֆQ	bJfR)t}Sb=hRjP
l]hI1@ 6A)ɜX
Ł7l(H">jauX]r䕧!}k)BdЙ
cO {4l@B4:bMJ=ᰪAaphN HrE| B_N9qBy0i0($|R*SdmY-F)0I1غݣ/?헟n$n륋PPoh=&?SxQ7۠>Q=/+p!]cSvmVZwCN-56nCkgO7{=툨`@ǜ%All ,nc[!'ʝ"74WV#pEvvJ
Qʶ@qx
KK>K6؉if+fj3QwP{8`3wڈxz13
Xz߼ֆb-EFDIao)VBCtlXn-!.d}D("ُáthч{&'Ni?
=%g,xkLb&2zC\Յ
;Ό<micYaZmݸ^wq9Gһǰz!=5Bbj[ҪJS%Ϸ
Jn,NLoM꽝cQYJFoQ-(q!oR%ݔ*׺ӑrɝI4|L1kRӳ9+yUi	෌"EZ)u"6X߸@㢮Y*9@Utq`׸BuN4?
4ٷP߉dT\,)(aB"л"ӥϨ1{X.JO$r`z€.=:@k~Μ
'7HSڴa3%yevKH
Bf5L9!V*2egWS"EtJևJ\8un5f	n4G,UHSX6)=O㣈G%cM@'\ǔe	g1yAKpӹ$o5daJi
_EMikK?&׮71-9ՎXtV^M[ao^jrNv/H
G)Aω|Mjj*5S6p%F#޶zVڑYQ4MD,OB:o.3ִHDZTlҞB71wa5Lj짝
$S!ulVLT
2MxTRB֝%o9Ҧ}NءZ*	~CVHe4/:"`5:~](An7qɩW:֐Rz_5 *:=QA~
xBrd6i9hz[勪&+tDڤBqgaLSNa֟K;1j)>Uhڄ1|
&&Pw'#}|Lw1Nrjo0$ZS+נ_%"tJΔ{1qs9)(b[rWd,^+12םnu!$QGܔ޳,(ga0:Rwa)}ICU[>h_/Y}:{]5-]z	@v#K+~b,q*WO]#t[r5#/N%]MoYyXH)a'઎f.e@x&|z)`L[JZXKR9R"- a<׌Pɭ-$V8TfJg/cX&6}cn+E9"
Ef,muDJUvrZ%]n3n(!gWo}$	W@q_x6܊tI⤎*@mw|m[ia}m{5vD]i(@ұ=
sn;Sb*54S?>?-( J!
(Fl6cUdm[aD|쓺a0gíde".Y#VK22\*	I!3r=jj0/o_xJg鿮hG[W<?y9&''P>c5cocPz-k2@aV̓nvhU}; XSĝQuW.҃]^3m1CkcK!x怬D=~߹H+e4VQ41E߷O"~BUan4Cf-32?cdJvκe[iKL+szPvQ
G23"~0%lJJeCQ2?+ˢ^|FrAE	5I*ѓ;CGCq9}B)CmVbT-VWOEw\ayLrsSVBf2tʷ/:do΅pmT#zheE&5U/QA
^'3 =<}HNyו\߬r#vpVΥIo56ň;Yk>TƕZ~/̡p˥`ĿCXqÉ+[Tr#ꡊX6:rd6iUp:./+h.=9P5M{(W)4˕}Sɾ~PzxM͏Qh׾ãOgͯN;9)@w)?+!p?î>=n(OnY0,g}k:__Q
.ӕ5]K'ٗקŸKt+4@K=JDRzMbؕV3vuVqgr Ą1| gxIVRb;MxmHv~F.^.Mkwؕؑ7u
jwoA~,B0NЍ91s[ọ+7PXN߃_veQ]uEOEm;"`w'"am=*47%VTU~#4V/.'i@uc_ڦKtpWVnSom6Կ(OOBDgfQ-3Ij)\!fTKۡw72sYa>~SB@i_j
%PK!HpN5#poetry-0.4.0.post1.dist-info/RECORDɒزYDfpH4$b}'xKTef("I˗jpuJYv}X9*I):1Jͤ/P`.J 7i
>/08LMU(	=Ne.Fcg@duE:V|jPW(v:ب	(!ݦ)R63Љz	rr<	[
tVvN0ӯҭȑ/+xT6B`Z+ӍلpewE^=O\Sh%hOFq0A\EMU
RADe!,c]7RzbP[~@bF8wB7X^\OEOeS'zg!2D磝dP8SD,GdVNaKuj;\Ei	832)a=^+T׹1wKN~p	jYl˖fzGd<a$T}C߻ShMEQMA:",aPw*Fc#r?ekt97Jw6W}򄾤}rh)['\ڽNsX5q^b\
tG]XSN}j{Y(_Ϻ
WMcb$?rϾ3AK]4H.@s)US
;n0*#1}M*@9KI`fkC=`KB #&a)—)%Vm2򸿺	`!PKL>7iHr|4NZc9!kFVA)
!(3+[.ܠzv!Zp.vͬ9؃CCTCד{b{0	;ԝOHNv5}=N3EM~5N&,RGbEIV<@w/0_n^4˭8I^kμ?jŤb(~BXzA!	_M|їzΙ.
HpL<гG(=!G޷_j,	7񟿟x%}ݤ^O]KWSOmFs)
D4޼R<)]G}jwB^Q8Gݟ"+;jwxo<(ݾ^Fx",˦,|wWms_Kqi1hBr
%ZmWM+z&\ ǫAiCn){9=#Ci@03:W'.RB,Ag)	k3t4q."﷨#0;'3Rh-|:&@UҴNvK1D0lj[X<qž.cKk2wnڹHϛ>ꢎ_G9A,j+~1!)؏WoPwZde*I"3_s 1u]䤾C~G+йr8M'z=)%0ӇhTԫk_	4T;MWOlDC AY5ͅٺM5sWRc{Im+/k0鹠ÛzNr	gDB7F˂|oR
&pNܹs[J\Fv2ŝ<YXCT0!~|ɭŅ{Lo[!/ 3<=.MCh	o3{G+vsשHKvre.0cމAa(;c	8t98|R7]iR4_욢u߭0`50z \e
!|q܍_O,';+rh=
pJ\TdDm
n,q5`"Bs0ju01k4΂7l,:hӞ(#w?~dr)V%crb蹕Fd`4VW+iz4kUլNd@uX Ht#9~mA!L'}@,q96&hք8dl汾16/RI5*Mby%-)LnG?|n_h:xhu%E{LpJt4
	"shJ'3=ݼeaڕҶ/}k8OƘA{O'M+SʷetځA{3ȗ,ȎW`k%̼Aݲ3P7ei;]DuzzHM%4y8sЦ߫xC7"XGM;ݢTN5X]`25Lxa5P4] ľϖťX'#Rýx1r|=7$T~P|_I쿔!=mVkd>qJ>aX=(x)s^']'s}OV#lɈhouA(gj{nh!5uGblL| NMx3K	0b!~5O*
4A$}AF[eY]
g-sҶD«pE8_ ))uJ?.|%6a/Q|*{l5KX,*UߚF,l9'oD/p<`W)	%MտZɕ	|SPz3U憞){`dUyܶJ>faf[D7eȫ#`&
YF-g@w}7^>`qZ\b).0Bxh/Z+QnoLI'o㑲ЗE"ޟIŏ4JB~{ŕo%1\f)"k$t<#7Qq&X4B!t=Hs>EBY;#6]WIpWCL'ԁ/fCHJLKJ`33>z
&tIjpU^D1KU`"~.VEZcqR4L#۲I{].$ݫ{]0!w0P}lgdq*ۡD]q$W8& LU8޽/HFvpkΨC0ԅORf=^[ZU`kfُA!܏侬G,ᆦS#sNQ$ajG.XkMď}>z2Vq^=FuwU?jC$ձZ6^S+cO޶|_yo6E["sKbY{Fm4isr+͹PxsQ)LjؕD2*Y4yI5D{y%F"%{TyWBN` 2UZ>[:ԺlUS~}r8DmCn0NzM_`O&Pa'Z( ]amOsh'v2nat.
B3@6C1SF%g\Cuʉ.
~||#7~Cn\r8;ɀ6#_fgj4>_Ő2
rз(;%	"a'J{jB+B1ޡcsuDz"Bi`4DU1䛜,^{xѢ&b>%W,/5r3w~K
(I湰Xt]^҃>VE;ap
-9薭o?;zW½JO&^(gf9}cCI.+sw Pv_:ZAW2&}-F\+]ΫuǏî۟ZBkX
]8Qv^u[iq/MӂR3#nw߀MNE-@+H'e'Mmg߽::P)Sf{pV.Z/z}ߌ{?	SmF1XZu
GsMTp
y x3!M${	{3k&3?X8@BHƏ"KO!T}"'^0Wn6@FI
GekEϻ*[Ec6	{j۹H)=\a<F;Bt	"ݞ-TZ y38;^~pu;e[zqFjD݅Tbz
(z>F_ln-Y,uNܤ̕<ae'6QWrە߿x(( 	l"x
帟~旰g}vy?-DT]f.*h`{E(Z#W앺ۛۜ؛~,nP"9S]wnNYwYJiП>Bj~UT&iꏁ_c
L4צ(w8J}MJX)icoUfνY$ff#Dpoetry/masonry/builders/__init__.pyPKhL֯"poetry/masonry/builders/builder.pyPKXgL6:o#Jpoetry/masonry/builders/complete.pyPK!vhL Lpoetry/masonry/builders/sdist.pyPK	hLHb## Y2poetry/masonry/builders/wheel.pyPKuhLs#Vpoetry/masonry/metadata.pyPKUhL5k!!%]poetry/masonry/publishing/__init__.pyPKhL>**&K^poetry/masonry/publishing/publisher.pyPK2QfL poetry/masonry/utils/__init__.pyPK2QfLƨC88шpoetry/masonry/utils/helpers.pyPK2QfLD.gssFpoetry/masonry/utils/module.pyPKzWL;8xMMpoetry/mixology/__init__.pyPK