PK!indextools/__init__.pyPK!?!indextools/bed.pyfrom contextlib import contextmanager import csv import _csv from pathlib import Path import subprocess from typing import Callable, Iterable, Iterator, Optional, Sequence, Union, Tuple from indextools.intervals import GenomeInterval from xphyle import STDOUT, open_ from xphyle.utils import read_delimited class BedInterval(GenomeInterval): def __init__( self, contig: Union[int, str], start: int, end: int, name: Optional[str] = None, value: Optional[int] = None, strand: Optional[str] = ".", other: Optional[Sequence[str]] = None ) -> None: super().__init__(contig, start, end) self.name = name self.value = value self.strand = strand self.other = other @staticmethod def from_row(row: Sequence[str]): contig = row[0] start = int(row[1]) end = int(row[2]) name = row[3] if len(row) > 3 else None value = row[4] if len(row) > 4 else None strand = row[5] if len(row) > 5 else "." other = row[6:] if len(row) > 6 else None return BedInterval(contig, start, end, name, value, strand, other) def as_bed6( self, name: Optional[str] = None, value: Optional[int] = None, strand: str = None ) -> Tuple: return super().as_bed6( name or self.name, value or self.value, strand or self.strand ) def as_dict(self) -> dict: d = super().as_dict() if self.name: d["name"] = self.name if self.value: d["value"] = self.value if self.strand: d["strand"] = self.strand if self.other: d["other"] = self.other return d def iter_bed_intervals(bed_file: Path) -> Iterator[BedInterval]: """ Iterate over intervals in a BED file. Args: bed_file: Path to the BED file. Returns: Iterator over :class:`GenomeInterval` objects. """ for row in read_delimited(bed_file): yield BedInterval.from_row(row) def iter_bed_interval_groups( bed_file: Path, assume_collated: bool = False ) -> Iterator[Sequence[BedInterval]]: """ Iterate over groups of intervals in a BED file, where a group is a set of consecutive intervals that have the same name (i.e. value in column 4). Args: bed_file: Path to the BED file. assume_collated: Whether to assume that all intervals with the same name are consecutive in the BED file. Returns: Iterator over sequences of :class:`GenomeInterval` objects. """ ivls = iter_bed_intervals(bed_file) if not assume_collated: ivls = sorted(ivls, key=lambda i: i.name) cur_group = [] for ivl in ivls: if not cur_group or ivl.name == cur_group[-1].name: cur_group.append(ivl) else: yield cur_group cur_group = [] yield cur_group @contextmanager def bed_writer( outfile: Path, bgzip: bool = True, index: bool = True ) -> Iterator[_csv.writer]: """ Context manager that provides a writer for rows in a BED file. Args: outfile: The BED file to write. bgzip: Whether to bgzip the output file after it is closed. index: Whether to index the output file (must be bgzipped). Yields: A csv.writer object. """ with open_(outfile, "wt", compression=("bgzip" if bgzip else None)) as out: writer = csv.writer(out, delimiter="\t") yield writer if bgzip and index: # TODO: replace this with dxpy.sugar.run subprocess.check_call(["tabix", "-p", "bed", str(outfile)]) def write_intervals_bed( ivls: Union[Iterable[GenomeInterval], Iterable[Sequence[GenomeInterval]]], outfile: STDOUT, name_pattern: str = "Partition_{group}", extra_columns: Optional[Callable[[GenomeInterval], tuple]] = None, bgzip: bool = True, index: bool = True ): """ Write GenomeIntervals to a BED file. Args: ivls: The intervals to write to BED format. outfile: The output BED file. name_pattern: Pattern for naming rows in the BED file. This can be a format string with placeholders 'group' and 'row' having the current group and row numbers. extra_columns: Optional function that creates extra columns for each row in the bed file. Must return a tuple of the same length for every row. Missing values should be represented as "." rather than empty string or None. bgzip: Whether to bgzip the output file. index: Whether to tabix index the output file (must be bgzipped). """ with bed_writer(outfile, bgzip, index) as bed: row_ctr = 1 for group_ctr, group in enumerate(ivls, 1): def writewrow(ivl: GenomeInterval): name = name_pattern.format(group=group_ctr, row=row_ctr) row = ivl.as_bed6(name) if extra_columns: row += extra_columns(ivl) bed.writerow(row) if isinstance(group, Sequence): for group_ivl in group: writewrow(group_ivl) row_ctr += 1 else: writewrow(group) row_ctr += 1 def annotations_extra_columns( names: Optional[Sequence[str]] = None ) -> Callable[[GenomeInterval], tuple]: """ Creates a function that can be passed as the `extra_columns` argument to `write_intervals_bed` that generates extra columns from the interval's annotations. Args: names: Optional, the names of the annotations to extract as extra columns. Returns: Callable """ def extra_columns(ivl: GenomeInterval): return ivl.as_bed_extended(names) return extra_columns PK!indextools/console/__init__.py"""IndexTools command line interface. """ from indextools.console import ( partition, features, split, commands ) from indextools.regions import Regions, parse_region import autoclick as ac COMMON_SHORT_NAMES = { "primary": "i", "index": "I", "partitions_bed": "p", "pair": "P", "outfile": "o", "contig_sizes": "z", "region": "r", "exclude_region": "R", "contig": "c", "exclude_contig": "C", "targets": "t", "exclude_targets": "T" } def merge_short_names(d): sn = dict(COMMON_SHORT_NAMES) sn.update(d) return sn # Set global options ac.set_global("infer_short_names", False) ac.set_global("add_composite_prefixes", False) # Register conversion functions ac.conversion(decorated=parse_region) ac.composite_type( decorated=Regions, short_names=COMMON_SHORT_NAMES ) @ac.group() def indextools(): pass # Partition genomic regions based on read density estimated # from an index. indextools.command( decorated=partition.partition, types={ "slop": ac.DelimitedList(int) }, validations={ ("primary", "contig_sizes"): ac.defined_ge(1) }, short_names=merge_short_names({ "partitions": "n", "grouping": "g" }) ) # Count the features (e.g. reads, variants) in a primary file # within partitions (e.g. output by the 'partition' command). indextools.command( decorated=features.features ) # Split a primary file (e.g. BAM, VCF) into chunks based on # a partition BED file (e.g. output by the 'partition' command). indextools.command( decorated=split.split, types={ "slop": ac.DelimitedList(int) }, validations={ "slop": ac.SequenceLength(1, 2) }, short_names=merge_short_names({ "slop": "s" }) ) # Generate a list commands, one per partition, given a template # and a partition BED file (e.g. output by the 'partition' command). # indextools.command( # decorated=commands.commands # ) # # indextools.command( # decorated=run # ) PK!y  indextools/console/commands.pyfrom typing import Optional, Sequence import autoclick as ac from xphyle import open_ from indextools.bed import BedInterval, iter_bed_interval_groups, iter_bed_intervals def commands( template: str, primary: ac.ReadableFile, partitions_bed: ac.ReadableFile, group: bool = True, outfile: Optional[ac.WritableFile] = None, jinja2: bool = False ): """ Generate a list commands, one per partition, given a template and a partition BED file. Args: template: The command template. primary: The primary file to split. partitions_bed: The partitions BED file. group: Whether to group intervals by name. If not, one command is generated for each interval rather than each partition. outfile: The output file - a text file with one command per line. jinja2: Whether the template is in Jinja2 syntax. """ if jinja2: try: from jinja2 import Template tmpl = Template(template) tmpl_fn = tmpl.render except ImportError: raise ValueError("Jinja2 must be installed to use Jinja2 templates") else: tmpl_fn = template.format if group: itr = iter_bed_interval_groups(partitions_bed) def get_format_kwargs(ivls: Sequence[BedInterval]): return { "regions": ivls } else: itr = iter_bed_intervals(partitions_bed) def get_format_kwargs(ivl: BedInterval): return ivl.as_dict() with open_(outfile, "wt") as out: for rank, item in enumerate(itr): kwargs = { "primary": str(primary), "rank": rank, } kwargs.update(get_format_kwargs(item)) print(tmpl_fn(**kwargs), file=out) PK!==indextools/console/features.pyfrom typing import Optional import autoclick as ac import pysam from indextools.bed import GenomeInterval, iter_bed_intervals, write_intervals_bed from indextools.utils import replace_suffix def features( primary: ac.ReadableFile, partitions_bed: ac.ReadableFile, outfile: Optional[ac.WritableFile] = None, bgzip_outfile: bool = True, index_outfile: bool = True ): """ Counts the number of features per partition, given a primary file and a BED file (such as output by the `partition` command). Args: primary: The primary file. partitions_bed: The partition BED file. outfile: The output BED file. If not specified, the primary filename is used with a '_features' suffix. bgzip_outfile: Bgzip the output file. Set to False if 'outfile' is specified and it does not end with '.gz'. index_outfile: Tabix index the output file. Set to False if bgzip is disabled. """ if outfile is None: suffix = "_features.bed" if bgzip_outfile: suffix += ".gz" outfile = replace_suffix(primary, suffix) partitions_iter = iter_bed_intervals(partitions_bed) with pysam.AlignmentFile(primary, "rb") as bam: def count_interval(ivl: GenomeInterval): def check_read(read): """Check that a read starts within `ivl`. """ return ivl.start <= read.reference_start < ivl.end count = bam.count(*ivl.as_bed3(), read_callback=check_read) return count, write_intervals_bed( partitions_iter, outfile, extra_columns=count_interval, bgzip=bgzip_outfile, index=index_outfile ) PK!;Tindextools/console/partition.py""" TODO: allow for custom partition naming """ import pathlib from typing import Callable, List, Optional, cast from indextools.bed import write_intervals_bed from indextools.index import IntervalGrouping, group_intervals from indextools.intervals import GenomeInterval from indextools.regions import Regions from indextools.utils import References, split_path import autoclick as ac from ngsindex import IndexType, CoordinateIndex, resolve_index_file, parse_index def partition( primary: Optional[ac.ReadableFile] = None, index: Optional[ac.ReadableFile] = None, partitions: int = 100, grouping: IntervalGrouping = IntervalGrouping.CONSECUTIVE, outfile: Optional[ac.WritableFile] = None, annotation: Optional[List[str]] = None, contig_sizes: Optional[ac.ReadableFile] = None, regions: Optional[Regions] = None, bgzip_outfile: bool = True, index_outfile: bool = True, ): """ Determine a partitioning of the input file, such that all partitions are roughly the same volume. Args: primary: The file to partition. index: The index of the primary file. partitions: The number of partitions to generate. grouping: How to group intervals: 'NONE' - do not group intervals; 'CONSECUTIVE' - group consecutive intervals (including intervals on consecutive contigs); 'ROUND_ROBIN' - assign intervals to partitions in round-robin fashion. outfile: The output partition BED file. Defaults to '{file}_partitions.bed.gz'. annotation: Names of annotations to add in columns 7+ of the output BED file. Currently accepted values are: 'child_lengths' (comma-delimited list of lengths in bp of child intervals), 'child_volumes' (comma-delimited list of volumes of child intervals). contig_sizes: A file with the sizes of all the contigs in the index; only necessary if the primary file is not specified, or if it does not have sequence information in its header. This is a two-column tab-delimited file ({contig_name}\t{contig_size}). regions: Regions to which the partitions are restricted. bgzip_outfile: Bgzip the output file. Set to False if 'outfile' is specified and it does not end with '.gz'. index_outfile: Tabix index the output file. Set to False if bgzip is disabled. """ index_file = resolve_index_file(primary, IndexType.BAI, index) if outfile is None: prefix = split_path(primary)[1] outfile_name = f"{prefix}.bed" if bgzip_outfile: outfile_name += ".gz" outfile = pathlib.Path.cwd() / outfile_name elif outfile.suffix != ".gz": bgzip_outfile = False if not bgzip_outfile: index_outfile = False if contig_sizes: references = References.from_file(contig_sizes) else: references = References.from_bam(primary) partition_intervals = group_intervals( cast(CoordinateIndex, parse_index(index_file, IndexType.BAI)), references, num_groups=partitions, grouping=grouping, regions=regions ) extra_columns = None if annotation: fns = [ANNOTATION_FUNCTIONS[name] for name in annotation] def extra_columns(ivl: GenomeInterval) -> tuple: return tuple(fn(ivl) for fn in fns) write_intervals_bed( partition_intervals, outfile, bgzip=bgzip_outfile, index=index_outfile, extra_columns=extra_columns ) def child_apply(fn: Callable): def _apply(ivl: GenomeInterval): children = ivl.annotations.get("child_intervals", None) if children: return ",".join(str(fn(child)) for child in children) else: return "." return _apply ANNOTATION_FUNCTIONS = { "child_lengths": child_apply(lambda child: len(child)), "child_volumes": child_apply(lambda child: child.volume) } PK!<~ indextools/console/split.pyimport enum from functools import partial import os from typing import Optional, Tuple import autoclick as ac import pysam from indextools.bed import BedInterval, iter_bed_interval_groups from indextools.utils import References, split_path class FeatureInclusion(enum.Enum): """ Enumeration of options for determining which features to include for an interval. """ OVERLAP = lambda start, end, read: ( start < read.end and end > read.start ) """Include any feature that overlaps at least one base of the interval.""" CONTAIN = lambda start, end, read: ( read.start >= start and read.end <= end ) """Include any feature that is fully contained in the interval.""" START = lambda start, end, read: ( start <= read.start <= end ) """Include any feature whose starting position is within the interval.""" END = lambda start, end, read: ( start <= read.end <= end ) """Include any feature whose ending position is within the interval.""" def split( primary: ac.ReadableFile, partitions_bed: ac.ReadableFile, slop: Optional[Tuple[int, ...]] = None, features: FeatureInclusion = FeatureInclusion.OVERLAP, name_format: str = "{prefix}.{rank}.{ext}", output_dir: Optional[ac.WritableDir] = None, contig_sizes: Optional[ac.ReadableFile] = None ): """ Split a primary file based on partitions in a BED file. Args: primary: The primary file to split. partitions_bed: slop: Padding to add on each side of each interval. If a single value, the same value is used for both the left and right side of the interval; otherwise the left and right slop can be specified separately. features: name_format: output_dir: contig_sizes: A file with the sizes of all the contigs in the index; only necessary if the primary file is not specified, or if it does not have sequence information in its header. This is a two-column tab-delimited file ({contig_name}\t{contig_size}). """ if slop and len(slop) == 1: slop = slop[0], slop[0] path, prefix, exts = split_path(primary) ext = os.extsep.join(exts) if output_dir is None: output_dir = path if contig_sizes: references = References.from_file(contig_sizes) else: references = References.from_bam(primary) with pysam.AlignmentFile(primary, "rb") as bam: for rank, ivl_list in enumerate(iter_bed_interval_groups(partitions_bed)): partition_bam_filename = output_dir / name_format.format( rank=rank, prefix=prefix, ext=ext, **ivl_list[0].as_dict() ) with pysam.AlignmentFile(partition_bam_filename, "wb", bam) as out: def write_ivl_reads(ivl: BedInterval): contig, start, end = ivl.as_bed3() if slop: start = max(start - slop[0], 0) end = min(end + slop[1], references.get_size(contig)) reads = bam.fetch(contig, start, end) if features is not FeatureInclusion.OVERLAP: reads = filter(partial(features.value, start, end), reads) for read in reads: out.write(read) for i in ivl_list: write_ivl_reads(i) PK!E}TGGindextools/index.pyfrom enum import Enum import math import statistics from typing import Iterator, List, Optional, Sequence, Tuple, Union, Iterable from indextools.intervals import GenomeInterval, IVL from indextools.regions import Regions from indextools.utils import References from ngsindex import CoordinateIndex, Offset BGZF_BLOCK_SIZE = 2 ** 16 """Size of a BGZF block in bytes.""" INTERVAL_LEN = 2 ** 14 """Length of an index interval in bases.""" class IntervalGrouping(Enum): """Interval grouping strategies. """ NONE = 0 """Do not group intervals.""" CONSECUTIVE = 1 """Group together equal numbers of consecutive intervals.""" ROUND_ROBIN = 2 """Distribute intervals to groups in a round-robin fashion.""" class VolumeInterval(GenomeInterval): def __init__( self, contig: str, start: int, end: int, volume: Optional[int] = None, **kwargs ): super().__init__(contig, start, end, **kwargs) self.volume = volume def get_volume(self, start: Optional[int] = None, end: Optional[int] = None) -> int: if not start or start < self.start: start = self.start if not end or end > self.end: end = self.end return int(math.ceil(((end - start) / len(self)) * self.volume)) def add(self: "VolumeInterval", other: "VolumeInterval") -> "VolumeInterval": cmp = self.compare(other) if cmp[0] != 0: raise ValueError(f"Cannot merge non-overlapping intervals {self}, {other}") if cmp[3] == 1: return self elif cmp[2] == 1: return other else: if other.start < self.start: other_vol = other.get_volume(end=self.start) else: other_vol = other.get_volume(start=self.end) return VolumeInterval( self.contig, self.start, other.end, self.volume + other_vol, **self._merge_annotations(other) ) def subtract( self: "VolumeInterval", other: "VolumeInterval" = None ) -> Tuple[Optional["VolumeInterval"], Optional["VolumeInterval"]]: self.contig_equal(other) if other not in self: raise ValueError(f"Intervals do not overlap: {self}, {other}") left = right = None if other.start > self.start: left = VolumeInterval( self.contig, self.start, other.start, self.get_volume(end=other.start) ) if other.end < self.end: right = VolumeInterval( self.contig, other.end, self.end, self.get_volume(start=other.end) ) return left, right def slice( self: "VolumeInterval", start: Optional[int] = None, end: Optional[int] = None ) -> "VolumeInterval": if start is None or start < self.start: start = self.start if end is None or end > self.end: end = self.end volume = self.get_volume(start, end) return VolumeInterval(self.contig, start, end, volume) def split( self: "VolumeInterval", num_pieces: Optional[int] = 2, target_volume: Optional[int] = None ) -> Iterator["VolumeInterval"]: """Break up an interval into smaller pieces. Args: num_pieces: Number of pieces in which to split the interval. Ignored if `target_volume` is set. Defaults to 2. target_volume: Target volume of the pieces. """ if target_volume: num_pieces = int(math.ceil(self.volume / target_volume)) # Yield one interval per piece total_length = len(self) piece_length = int(math.ceil(total_length / num_pieces)) for ivl_start in range(self.start, self.end, piece_length): ivl_end = min(self.end, ivl_start + piece_length) ivl_vol = int(math.ceil( ((ivl_end - ivl_start) / total_length) * self.volume )) yield VolumeInterval(self.contig, ivl_start, ivl_end, ivl_vol) @staticmethod def merge_precomputed(intervals: Iterable[IVL], volume: int) -> "VolumeInterval": """ Merge a group of intervals for which the total volume has already been computed. Args: intervals: Intervals to merge. volume: Precomputed volume. Returns: New VolumeInterval. """ intervals = list(sorted(intervals)) contig = intervals[0].contig start = intervals[0].start end = intervals[-1].end return VolumeInterval(contig, start, end, volume) def as_bed6( self, name: Optional[str] = None, value: Optional[int] = None, strand: str = "." ) -> Tuple: if value is None: value = self.volume return super().as_bed6(name, value, strand) def as_dict(self) -> dict: d = super().as_dict() d["volume"] = self.volume return d class IndexInterval(VolumeInterval): """Mapping between a genomic interval and an offset in the linear index, including offset diffs from the previous interval. Args: ref_num: The reference index. ivl_num: The interval index. offset: The offset in the linear index. file_offset_diff: Difference in file offset from the previous IndexInterval. block_offset_diff: Difference in the block offset from the previous IndexInterval. contig_end: Whether this IndexInterval is the last one in a contig. """ def __init__( self, references: References, ref_num: int, ivl_num: int, offset: Offset, file_offset_diff: int, block_offset_diff: int, contig_end: bool, **kwargs ) -> None: contig_name, contig_len = references.reference_list[ref_num] start = ivl_num * INTERVAL_LEN super().__init__( contig_name, start, min(contig_len, start + INTERVAL_LEN), **kwargs ) self.ref_num = ref_num self.ivl_num = ivl_num self.offset = offset self.file_offset_diff = file_offset_diff self.block_offset_diff = block_offset_diff self.contig_end = contig_end @property def file_offset(self) -> int: return self.offset.file_offset @property def block_offset(self) -> int: return self.offset.block_offset def init_volume(self, compressed_block_size: int) -> int: """Estimate the volume of the interval (in bytes) and set the value of `self.volume`. First, divide ivl.file_offset_diff by compressed block size to estimate how many blocks are covered by the interval (minimum of 1). Then multiply by the uncompressed block size and add the block offset. Args: compressed_block_size: The estimated compressed size of a BGZF block. Returns: The estimated interval volume. """ if self.file_offset_diff == 0: self.volume = self.block_offset_diff else: num_blocks = max(1.0, self.file_offset_diff / compressed_block_size) self.volume = int(math.ceil( (num_blocks * BGZF_BLOCK_SIZE) + self.block_offset_diff )) return self.volume def __repr__(self) -> str: return ( f"{super().__repr__()} (ref_num={self.ref_num}, ivl_num={self.ivl_num}, " f"offset={self.offset}, file_offset_diff={self.file_offset_diff}, " f"block_offset_diff={self.block_offset_diff})" ) # TODO: does the last partition need special handling (i.e. due to unmapped reads)? # TODO: add annotations to the intervals with information about contained index # intervals. def group_intervals( index: CoordinateIndex, references: References, num_groups: int, grouping: IntervalGrouping = IntervalGrouping.CONSECUTIVE, regions: Optional[Regions] = None, **kwargs, ) -> Union[Sequence[VolumeInterval], Sequence[Sequence[VolumeInterval]]]: """Determine equal-volume intervals, and then group adjacent intervals together to create `num_groups` equal volume groups. Args: index: An Index object. references: References object. Only used if the index file can't be resolved and we need to generate uniform volume intervals across the genome. num_groups: Number of groups to return. grouping: Strategy for grouping intervals regions: Restrict intervals to these regions. kwargs: Additional arguments passed through to `iter_index_intervals.` Returns: List of size `num_groups`. If grouping == NONE, each group is a single interval. Otherwise, each group is a list of intervals. """ ivls = list(iter_index_intervals(index, references, **kwargs)) if regions: ivls = list(regions.intersect(ivls)) num_intervals = len(ivls) # With a large number of groups, there is the possibilty to have # fewer groups than intervals. Split all intervals in half until # num_intervals >= num_groups. while num_intervals < num_groups: new_intervals = [] for ivl in ivls: new_intervals.extend(ivl.split()) ivls = new_intervals num_intervals = len(ivls) if grouping == IntervalGrouping.NONE: return ivls elif grouping == IntervalGrouping.ROUND_ROBIN: # This code will distribute groups round-robin, so each partition will have # a large number of small groups return [ivls[n::num_groups] for n in range(num_groups)] elif grouping == IntervalGrouping.CONSECUTIVE: # This code will distribute groups blockwise, so each partition will have a # small number of large groups groups: List[List[VolumeInterval]] = [[] for _ in range(num_groups)] intervals_per_group = int(math.floor(num_intervals / num_groups)) remainder = num_intervals - (intervals_per_group * num_groups) cur_group = 0 cur_ivl: Optional[VolumeInterval] = None cur_group_ivl_count = 0 target_ivl_count = intervals_per_group + (remainder > 0) for i, ivl in enumerate(ivls): if cur_ivl and ivl.contig == cur_ivl.contig: cur_ivl = cur_ivl.add(ivl) else: if cur_ivl: groups[cur_group].append(cur_ivl) cur_ivl = ivl cur_group_ivl_count += 1 if cur_group_ivl_count >= target_ivl_count and cur_group < (num_groups - 1): if cur_ivl: groups[cur_group].append(cur_ivl) cur_ivl = None cur_group += 1 cur_group_ivl_count = 0 target_ivl_count = intervals_per_group + (cur_group < remainder) if cur_ivl: groups[cur_group].append(cur_ivl) return groups raise ValueError(f"Unsupported grouping: {grouping}") def iter_index_intervals( index: CoordinateIndex, references: References, batch_volume: int = None, batch_volume_coeff: float = 1.5, ) -> Iterator[VolumeInterval]: """Iterate over the intervals in the index and yield batch intervals that meet certain size criteria. Args: index: The pyngs.index.Index object. references: A list of reference sequences, in the same order they appear in the BAM header. Each item is a tuple (name, length). batch_volume: The target batch volume (in bytes). If None, this is estimated from the data. batch_volume_coeff: Multiplier for `batch_volume` to determine the maximum batch volume (1.5). Returns: Iterator over VolumeIntervals. Positions are zero-based, half-open, following the pysam convention. """ ivls = index_to_intervals(index, references) file_offset_diffs = list( i.file_offset_diff for i in ivls if i.file_offset_diff > 0 ) if len(file_offset_diffs) > 0: # We assume that the median difference in block offsets is about the # average compressed size C of a single 64 KB block. compressed_block_size = int(statistics.median(file_offset_diffs)) else: # If all the intervals are empty, it doesn't matter what the # compressed block size is - we just have to set it to something compressed_block_size = 1 # Compute interval volumes interval_volumes = [ivl.init_volume(compressed_block_size) for ivl in ivls] # If the user did not specify a batch volume, we use the median of the # interval volumes. if not batch_volume: batch_volume = statistics.median( ivol for ivol in interval_volumes if ivol != 0 ) # Group together intervals smaller than batch volume, and break up intervals # larger than batch volume. group: List[IndexInterval] = [] cur_ivl = ivls[0].ivl_num - 1 cur_volume = 0 max_batch_volume = batch_volume * batch_volume_coeff for interval, interval_volume in zip(ivls, interval_volumes): large_interval = interval_volume >= max_batch_volume # Yield the existing group if this interval is either large or not # consecutive with the previous interval. if group and (large_interval or (interval.ivl_num - 1) != cur_ivl): yield VolumeInterval.merge_precomputed(group, cur_volume) group = [] cur_volume = 0 if large_interval: # If the current interval is large, break it up and yield the pieces yield from interval.split(target_volume=batch_volume) else: # Yield if adding the current interval would make the existing # group too large if group and (cur_volume + interval_volume) > max_batch_volume: yield VolumeInterval.merge_precomputed(group, cur_volume) group = [] cur_volume = 0 group.append(interval) cur_volume += interval_volume cur_ivl = interval.ivl_num # Yield if this interval is the last one in a contig if interval.contig_end: yield VolumeInterval.merge_precomputed(group, cur_volume) group = [] cur_volume = 0 # Yield the last group if group: yield VolumeInterval.merge_precomputed(group, cur_volume) def index_to_intervals( index: CoordinateIndex, references: References ) -> List[IndexInterval]: """Reduce an index to a list of intervals. Args: index: The index to convert. references: Returns: A list of IndexIntervals. """ prev_interval = None prev_ref_num = None prev_ivl_num = None max_intervals = sum(int(math.ceil(r / INTERVAL_LEN)) for r in references.lengths) idxintervals: List[Optional[IndexInterval]] = [None] * max_intervals cur_interval = 0 for ref_num, ref in enumerate(index.ref_indexes): num_ivls = ref.num_intervals if num_ivls == 0: continue # The first chunks of the contig are likely to be centromeric or telomeric, # and thus empty ref_intervals = ref.intervals first_ivl_num = 0 while first_ivl_num < num_ivls and ref_intervals[first_ivl_num].empty: first_ivl_num += 1 if first_ivl_num == num_ivls: # There are no reads on the contig continue # To compute the volume of each interval, we have to know the offsets of the # next interval. This always puts us one interval in arrears, which means we # have to take care with the intervals at the ends of contigs. e.g. we need # the first interval in chr2 to compute the volume of the last interval in # chr1, so the IndexInterval object we create from processing the first # interval in chr2 needs to have the ref and interval numbers left over at # the end of chr1. if prev_interval is None: prev_interval = ref_intervals[first_ivl_num] prev_ref_num = ref_num prev_ivl_num = first_ivl_num first_ivl_num += 1 if first_ivl_num > 0: ref_intervals = ref_intervals[first_ivl_num:] for ivl_num, ivl in enumerate(ref_intervals, first_ivl_num): if ivl.empty: # the interval has no reads continue # Compute the difference in file and block offsets from the # previous interval. Remember that file offsets represent # compressed size and block offsets represent uncompressed size # from the start of the block. file_offset_diff = 0 if ivl.file_offset != prev_interval.file_offset: # Compute the difference from the previous file offset if # we've moved to a new block. file_offset_diff = ivl.file_offset - prev_interval.file_offset # To compute the difference in block offsets, we need to # subtract the first part of the prev block (prev_block_offset) # and then add the offset into the current block. block_offset_diff = ivl.block_offset - prev_interval.block_offset # Finally, add the interval to the list idxintervals[cur_interval] = IndexInterval( references, prev_ref_num, prev_ivl_num, ivl, file_offset_diff, block_offset_diff, prev_ref_num != ref_num, ) cur_interval += 1 prev_interval = ivl prev_ref_num = ref_num prev_ivl_num = ivl_num # Add a dummy interval to cap the last contig if prev_ref_num is not None: idxintervals[cur_interval] = IndexInterval( references, prev_ref_num, prev_ivl_num, prev_interval, 0, 0, True ) cur_interval += 1 return idxintervals[:cur_interval] PK!~p\\indextools/intervals.py"""This is a complete example of how a BAM index can be used to estimate the "density" of each genomic interval (16 kb size), where density is an approximate measure of the size in bytes (which, in turn, is roughly correlated with the number of reads). The primary use case is to partition a BAM file into N groups of intervals for parallel processing a BAM file across N threads. ``` from intervals import get_bam_partitions # Parallelize operations on a BAM file across many threads threads = 32 # Get partitions of the BAM file that are roughly equal partitions = get_bam_partitions(bam_file, num_groups=threads) # Submit parallel jobs for intervals in partitions: submit_job(process_bam, bam_file, intervals) ``` """ from collections import Sized import copy from enum import IntFlag import functools import itertools import operator from typing import ( Iterable, Iterator, Optional, Sequence, Tuple, Type, TypeVar, Union, cast, ) from indextools.utils import OrderedSet from ngsindex.utils import DefaultDict BGZF_BLOCK_SIZE = 2 ** 16 """Size of a BGZF block in bytes.""" INTERVAL_LEN = 2 ** 14 """Length of an index interval in bases.""" # Type aliases PositionTuple = Tuple[Union[int, str], int] IntervalComparison = Tuple[int, int, float, float] PositionComparison = Tuple[int, float, float] class Side(IntFlag): """Side flag for use with interval searching. """ LEFT = 1 RIGHT = 2 IVL = TypeVar("IVL", bound="GenomeInterval") class GenomeInterval(Sized): """An interval consists of a contig, start, and end position. Start and/or end may be None to signal that the interval extends to the end of the contig. Todo: How to merge annotations? """ def __init__( self, contig: Union[int, str], start: int, end: int, **kwargs ) -> None: if start is None: start = 0 if end <= start: raise ValueError(f"'end' must be >= 'start'; {end} <= {start}") self.contig = contig self.start = start self.end = end self.annotations = kwargs @property def region(self) -> str: return "{}:{}-{}".format(self.contig, self.start + 1, self.end) def __len__(self) -> int: return self.end - self.start def __contains__(self: IVL, other: Union[int, PositionTuple, IVL]) -> bool: """Does this interval overlap `other`? Args: other: Either an Interval or an int. If an int, assumed to be a position on the same contig as this sequence. """ if isinstance(other, GenomeInterval): cmp = self.compare(other) return cmp[0] == 0 and abs(cmp[2]) > 0 if isinstance(other, tuple): contig, pos = other if self.contig != contig: return False else: pos = cast(int, other) return self.start <= pos < self.end def __sub__(self: IVL, other: IVL) -> int: """Returns the distance between this interval and `other`. Negative value indicates that `other` is to the right, and positive value indicates that it is to the left. """ self.contig_equal(other) return self.compare(other)[1] def __lt__(self: IVL, other: IVL) -> bool: contig, diff, overlap1, overlap2 = self.compare(other) if contig < 0: return True elif contig > 0: return False elif min(float(diff), overlap1) < 0: return True else: return overlap1 == 1 and overlap2 < 1 def __eq__(self: IVL, other: IVL) -> bool: return ( self.contig == other.contig and self.start == other.start and self.end == other.end ) def __hash__(self) -> int: return hash((self.contig, self.start, self.end)) def __repr__(self) -> str: return f"{self.contig}:{self.start}-{self.end}" def compare(self: IVL, other: IVL) -> IntervalComparison: """Rich comparison of intervals. Returns: A tuple consisting of 1) the comparison between this contig and other's contig; 2) the number of base pairs distance between this interval and `other`; and 3) the fraction of this interval overlapped by `other`, and 4) the fraction of `other` that overlaps this interval. Negative/positve numbers represents that one interval is to the left/right of the other. Examples: # End-inclusivity: a Interval is not end-inclusive, so an # interval whose end position is the same as the start position of # a second interval does *not* overlap that interval: i1 = Interval('chr1', 50, 100) i2 = Interval('chr1', 100, 200) cmp = i1.compare(i2) # => (0, -1, 0, 0) """ diff = 0 overlap = 0 if isinstance(other.contig, int): other_contig = cast(int, other.contig) else: other_contig = cast(str, other.contig) contig = self.contig if contig < other_contig: contig_cmp = -1 elif contig > other_contig: contig_cmp = 1 else: contig_cmp = 0 if self.start >= other.end: diff = (self.start + 1) - other.end elif self.end <= other.start: diff = self.end - (other.start + 1) elif self.start >= other.start: overlap = min(self.end, other.end) - self.start else: overlap = other.start - self.end other_len = other.end - other.start return ( contig_cmp, diff, min(1, overlap / len(self)), min(1, (-1 * overlap) / other_len) ) def contig_equal(self: IVL, other: IVL) -> None: if self.contig != other.contig: raise ValueError( f"Intervals are on two different contigs: " f"{self.contig} != {other.contig}" ) def add(self: IVL, other: IVL) -> IVL: """ Add another interval to this one. Args: other: The interval to add. Returns: A new GenomeInterval. """ cmp = self.compare(other) if cmp[0] != 0 or cmp[1] > 1: raise ValueError( f"Cannot merge non-overlapping/adjacent intervals {self}, {other}" ) return GenomeInterval( self.contig, min(self.start, other.start), max(self.end, other.end), **self._merge_annotations(other) ) def _merge_annotations(self: IVL, other: IVL) -> dict: annotations = copy.copy(self.annotations) or {} if "child_intervals" in annotations: annotations["child_intervals"].append(other) else: annotations["child_intervals"] = [self, other] return annotations def subtract(self: IVL, other: IVL = None) -> Tuple[Optional[IVL], Optional[IVL]]: self.contig_equal(other) if other not in self: raise ValueError(f"Intervals do not overlap: {self}, {other}") left = right = None if other.start > self.start: left = GenomeInterval(self.contig, self.start, other.start) if other.end < self.end: right = GenomeInterval(self.contig, other.end, self.end) return left, right def slice(self: IVL, start: Optional[int] = None, end: Optional[int] = None) -> IVL: if start is None or start < self.start: start = self.start if end is None or end > self.end: end = self.end return GenomeInterval(self.contig, start, end) @classmethod def intersect(cls: Type[IVL], ivl: IVL, *other: IVL) -> Iterator[IVL]: """ Intersect `ivl` with `other` intervals. Args: ivl: *other: Yields: Intervals of the same type as `ivl`. """ if len(other) == 0: raise ValueError("Must specify at least one other interval to intersect") other = list(sorted(other)) ivl.check_contig(other[0]) if len(other) == 1: other_list = other else: # First merge any of `other` intervals that are overlapping. other_list = [] o1 = other[0] for o2 in other[1:]: cmp = o1.compare(o2) if cmp[0] != 0: raise ValueError( f"Cannot intersect intervals on different contigs; " f"{o1.contig} != {o2.contig}" ) if cmp[1] <= 1: o1 = o1.add(o2) else: other_list.append(o1) o1 = o2 other_list.append(o1) for other_ivl in other_list: if other_ivl in ivl: yield ivl.slice(other_ivl) @classmethod def divide(cls: Type[IVL], ivl: IVL, *other: IVL) -> Iterator[IVL]: """ Generate new intervals by subtracting other from `ivl`. Args: ivl: *other: Yields: Intervals of the same type as `ivl`. Raises: ValueError if any of `other` intervals do not overlap. """ remaining = ivl for other in sorted(other): frag, remaining = remaining.subract(other) if frag: yield frag if remaining and len(remaining) > 0: yield remaining @classmethod def merge(cls: Type[IVL], intervals: Iterable[IVL]) -> IVL: """ Merge overlapping GenomeIntervals. Args: intervals: Intervals to merge Returns: A new interval of the same type as `self`. Raises: ValueError if any of the intervals do not overlap. """ intervals = list(sorted(intervals)) merged = intervals[0] for ivl in intervals[1:]: merged = merged.add(ivl) return merged def as_bed3(self): """ Returns this interval as a tuple in BED3 format. Returns: Tuple of length 3: (contig, start, end) """ return self.contig, self.start, self.end def as_bed6( self, name: Optional[str] = None, value: Optional[int] = None, strand: str = "." ) -> Tuple: """ Returns this interval as a tuple in BED6 format. Args: name: value: strand: Returns: Tuple of length 6: (contig, start, end, name, value, strand). """ if name is None: name = str(self) if value is None: value = len(self) return self.contig, self.start, self.end, name, value, strand def as_bed_extended(self, annotation_names: Optional[Sequence[str]], **kwargs): """ Returns this interval as a tuple with the first 6 columns being BED6 format and additional columns being annotations. Args: annotation_names: Optional list of annotation names for the extended columns. If specified, columns will be added in the specified order, and the empty value (".") used for missing columns. Otherwise, all annotations will be added in undetermined order. kwargs: Keyword arguments to `as_bed6`. Returns: Tuple of length 6+. """ bed = self.as_bed6(**kwargs) if annotation_names: bed += tuple(self.annotations.get(name, ".") for name in annotation_names) elif self.annotations: bed += tuple(self.annotations.values()) return bed def as_dict(self) -> dict: return { "contig": self.contig, "start": self.start, "end": self.end, "length": len(self), "region": self.region, "annotations": self.annotations } class Intervals: """Collection of InterLaps (one per contig). Args: intervals: Iterable of GenomeIntervals. interval_type: Type of Interval that will be added. If None, is auto-detected from the first interval that is added. allows_overlapping: Whether overlapping intervals can be added, or if overlapping intervals are merged. """ def __init__( self, intervals: Iterable[GenomeInterval] = (), interval_type: Type[GenomeInterval] = None, allows_overlapping: bool = True ) -> None: if interval_type is None: if intervals: intervals = list(intervals) interval_type = type(intervals[0]) else: raise ValueError( "Either 'interval_type' or 'intervals' must be specified." ) self.interval_type = interval_type self.interlaps = DefaultDict( default=functools.partial( InterLap, interval_type=interval_type, allows_overlapping=allows_overlapping ) ) if intervals: self.add_all(intervals) @property def contigs(self) -> Sequence[str]: return tuple(self.interlaps.keys()) def add_all(self, intervals: Iterable[GenomeInterval]) -> None: """Add all intervals from an iterable of GenomeIntervals. """ modified = set() for interval in intervals: self.interlaps[interval.contig].add(interval) modified.add(interval.contig) for contig in modified: self.interlaps[contig].commit() def find(self, interval: GenomeInterval) -> Iterator[GenomeInterval]: """Find intervals that overlap `interval`. """ contig = interval.contig if contig not in self.interlaps: return yield from self.interlaps[contig].find(interval) def intersect(self, interval: GenomeInterval) -> Iterator[GenomeInterval]: """Iterate over intersections with `interval`. Intersection is like find except that the yielded intervals include only the intersected portions. """ contig = interval.contig if contig not in self.interlaps: return yield from self.interlaps[contig].intersect(interval) def intersect_all( self, intervals: Iterable[GenomeInterval] ) -> Iterator[GenomeInterval]: """Iterate over intersections with all `intervals`. """ intersections = OrderedSet() for ivl in intervals: intersections.update(self.intersect(ivl)) yield from intersections def closest( self, interval: GenomeInterval, side: int = Side.LEFT | Side.RIGHT ) -> Iterator[GenomeInterval]: """Find the closest interval(s) to `interval. """ contig = interval.contig if contig not in self.interlaps: return yield from self.interlaps[contig].closest(other=interval, side=side) def __len__(self): return sum(len(ilap) for ilap in self.interlaps.values()) def __contains__(self, interval: GenomeInterval) -> bool: """Returns True if `interval` intersects any intervals. """ contig = interval.contig if contig not in self.interlaps: return False return interval in self.interlaps[contig] def __iter__(self) -> Iterator[GenomeInterval]: return itertools.chain(self.interlaps) # TODO: replace InterLap with one of # * https://github.com/brentp/quicksect # * https://github.com/hunt-genes/ncls/ # * https://biocore-ntnu.github.io/pyranges/ # * https://github.com/lh3/cgranges class InterLap: """Fast interval overlap testing. An InterLap is based on a sorted list of intervals. Resorting the list is only performed when `commit` is called. Overlap testing without first 'committing' any added intervals will probably yield incorrect results. Args: interval_type: Type of Interval that will be added. If None, is auto-detected from the first interval that is added. allows_overlapping: Whether overlapping intervals can be added, or if overlapping intervals are merged. See: Adapted from https://github.com/brentp/interlap. """ def __init__( self, interval_type: Optional[Type[GenomeInterval]] = None, allows_overlapping: bool = True ) -> None: self.interval_type = interval_type self.allows_overlapping = allows_overlapping self._iset = [] self._maxlen = 0 self._dirty = False def add( self, intervals: Union[GenomeInterval, Sequence[GenomeInterval]], commit: Optional[bool] = None ): """Add a single (or many) Intervals to the tree. Args: intervals: An interval or sequence of intervals. commit: Whether these additions should be immediately committed. """ if isinstance(intervals, GenomeInterval): intervals = [intervals] if self.interval_type is None: self.interval_type = type(intervals[0]) if self.allows_overlapping: self._iset.extend(intervals) self._dirty = True if commit: self.commit() elif commit is False: raise ValueError( "Cannot set commit=False for InterLaps in which overlapping " "intervals are not allowed." ) else: if self._dirty: self.commit() for ivl in intervals: overlapping = self.find(ivl) if overlapping: ovl_list = list(overlapping) for overlapping_ivl in ovl_list: self._iset.remove(overlapping_ivl) ovl_list.append(ivl) ivl = GenomeInterval.merge(ovl_list) self._iset.append(ivl) self._resort() def commit(self) -> None: """Commit additions to this InterLap. This just means updating the _maxlen attribute and resorting the _iset list. """ if self._dirty: self._resort() self._dirty = False def _resort(self): self._iset.sort() self._maxlen = max(len(r) for r in self._iset) def __len__(self) -> int: """Return number of intervals.""" return len(self._iset) def __iter__(self) -> Iterator[GenomeInterval]: return iter(self._iset) def __contains__(self, other: GenomeInterval) -> bool: """Indicate whether `other` overlaps any elements in the tree. """ left = InterLap.binsearch_left_start( self._iset, other.start - self._maxlen, 0, len(self._iset) ) # Use a shortcut, since often the found interval will overlap. max_search = 8 if left == len(self._iset): return False for left_ivl in self._iset[left:(left + max_search)]: if left_ivl in other: return True if left_ivl.start > other.end: return False r = InterLap.binsearch_right_end(self._iset, other.end, 0, len(self._iset)) return any(s in other for s in self._iset[(left + max_search):r]) def find(self, other: GenomeInterval) -> Iterator[GenomeInterval]: """Returns an iterable of elements that overlap `other` in the tree. """ left = InterLap.binsearch_left_start( self._iset, other.start - self._maxlen, 0, len(self._iset) ) right = InterLap.binsearch_right_end(self._iset, other.end, 0, len(self._iset)) iopts = self._iset[left:right] yield from (s for s in iopts if s in other) def intersect(self, other: GenomeInterval) -> Iterator[GenomeInterval]: """Like find, but the result is an iterable of new interval objects that cover only the intersecting regions. """ for ivl in self.find(other): pos = sorted((ivl.start, ivl.end, other.start, other.end)) yield self.interval_type(ivl.contig, pos[1], pos[2]) def closest( self, other: GenomeInterval, side: int = Side.LEFT | Side.RIGHT ) -> Iterator[GenomeInterval]: """Returns an iterable of the closest interval(s) to `other`. Args: other: The interval to search. side: A bitwise combination of LEFT, RIGHT. Yields: If side == LEFT or RIGHT, the single closest interval on the specified side is yielded. If side == LEFT | RIGHT, all intervals that are equidistant on the left and right side are yielded. """ left = None if side & Side.LEFT: left = max( 0, InterLap.binsearch_left_start( self._iset, other.start - self._maxlen, 0, len(self._iset) ) - 1, ) right = None if side & Side.RIGHT: right = min( len(self._iset), InterLap.binsearch_right_end( self._iset, other.end, 0, len(self._iset) ) + 2, ) if side == Side.LEFT | Side.RIGHT: # Expand candidates to include all left intervals with the same end # position and all right right intervals with the same start # position as the nearest. while left > 1 and self._iset[left].end == self._iset[left + 1].end: left -= 1 while ( right < len(self._iset) and self._iset[right - 1].start == self._iset[right].start ): right += 1 iopts = self._iset[left:right] ovls = [s for s in iopts if s in other] if ovls: # Yield all candidate intervals that overlap `other` yield from ovls else: # iopts = sorted([(abs(i - other), i) for i in iopts]) _, g = next(iter(itertools.groupby(iopts, operator.itemgetter(0)))) for _, ival in g: yield ival else: if side == Side.LEFT: ivl = self._iset[left] else: ivl = self._iset[right - 1] if ivl != other: yield ivl @staticmethod def binsearch_left_start( intervals: Sequence[GenomeInterval], x: int, lo: int, hi: int ) -> int: """Like python's bisect_left, but finds the _lowest_ index where the value x could be inserted to maintain order in the list intervals. """ while lo < hi: mid = (lo + hi) // 2 f = intervals[mid] if f.start < x: lo = mid + 1 else: hi = mid return lo @staticmethod def binsearch_right_end( intervals: Sequence[GenomeInterval], x: int, lo: int, hi: int ) -> int: """Like python's bisect_right, but finds the _highest_ index where the value x could be inserted to maintain order in the list intervals. """ while lo < hi: mid = (lo + hi) // 2 f = intervals[mid] if x < f.start: hi = mid else: lo = mid + 1 return lo PK!Ecllindextools/regions.pyfrom pathlib import Path import re from typing import Dict, Iterable, Iterator, List, Optional, Tuple, Union from .bed import iter_bed_intervals from .intervals import GenomeInterval, Intervals, IVL from .utils import References from autoclick import ValidationError REGION_RE = re.compile(r"[:-]") Region = Tuple[str, int, Union[str, int]] def parse_region(region_str: str) -> Region: """ Convert a region string into an interval. Args: region_str: Region string, such as 'chr1:100-1000' or '5:0-*'. Returns: A tuple (chr, start, end), where end may be an integer or '*' indicating the end of the contig. """ parts = REGION_RE.split(region_str) if len(parts) == 1: return parts[0], 0, "*" else: contig = parts[0] start = int(parts[1]) if len(parts) == 2: end = start else: end = int(parts[2]) if start <= 0: raise ValidationError( f"Invalid region interval {region_str}: start must be >= 1" ) start -= 1 if start >= end: raise ValidationError( f"Invalid region interval {region_str}: start must be <= end" ) return contig, start, end # TODO: add an invert option - not sure if makes sense to add this to the # constructor or make it a paramter to the methods. class Regions: """ Collection of genome regions. Args: region: A region string, e.g. chr1:10-100. exclude_region: A region to exclude. contig: A contig string (name, range, or macro). exclude_contig: A contig to exclude. targets: A BED file with targets to include. exclude_targets: A BED file with targets to exclude. """ def __init__( self, region: Optional[List[Region]] = None, exclude_region: Optional[List[Region]] = None, contig: Optional[List[str]] = None, exclude_contig: Optional[List[str]] = None, targets: Optional[Path] = None, exclude_targets: Optional[Path] = None ): self._include_regions = region self._exclude_regions = exclude_region self._include_contigs = contig self._exclude_contigs = exclude_contig self._include_targets = targets self._exclude_targets = exclude_targets self._references = None self._include_intervals = None self._exclude_intervals = None def init( self, references: References, macros: Optional[Dict[str, List[str]]] = None ): self._references = references if self._include_regions or self._include_contigs or self._include_targets: self._include_intervals = self._create_region_intervals( self._include_regions, self._include_contigs, self._include_targets, macros ) if self._exclude_regions or self._exclude_contigs or self._exclude_targets: self._exclude_intervals = self._create_region_intervals( self._exclude_regions, self._exclude_contigs, self._include_targets, macros ) def _create_region_intervals( self, regions: Optional[List[Region]] = None, contigs: Optional[List[str]] = None, targets: Optional[Path] = None, macros: Optional[Dict[str, List[str]]] = None ) -> Intervals: intervals = Intervals(allows_overlapping=False) if regions: intervals.add_all(self._create_interval(ivl) for ivl in regions) if contigs: if macros: while True: expanded = set() done = True for contig in contigs: if contig in macros: expanded.update(macros[contig]) done = False contigs = list(expanded) if done: break matchers = [] for contig in contigs: if "-" in contig: start_str, end_str = contig.split("-") i = 0 while start_str[i].isalpha(): i += 1 start = float(start_str[i:]) end = float(end_str[i:]) matchers.append(lambda c: start <= float(c[i:]) <= end) else: matchers.append(re.compile(contig).fullmatch) contigs = set() for ref in self._references.names: for matcher in matchers: if matcher(ref): contigs.add(ref) break if contigs: intervals.add_all( GenomeInterval(contig, 0, self._references[contig]) for contig in contigs ) if targets: intervals.add_all(iter_bed_intervals(targets)) return intervals def _create_interval(self, region: Region) -> GenomeInterval: contig, start, end = region if end == "*": if contig in self._references: end = self._references[contig] else: raise ValueError( f"Contig {contig} not found in references" ) return GenomeInterval(contig, start, end) def allows(self, interval: GenomeInterval) -> bool: """ Args: interval: Returns: True if 'interval' is fully contained within an included region (if any) and does not overlap any excluded region (if any). """ if self._include_intervals: contained = False overlapping = self._include_intervals.find(interval) for ivl in overlapping: if interval.compare(ivl)[2] == 1: contained = True else: contained = True if not contained or ( self._exclude_intervals and interval in self._exclude_intervals ): return False return True def iter_allowed(self) -> Iterator[GenomeInterval]: if self._include_intervals: interval_itr = self._include_intervals else: interval_itr = ( GenomeInterval(ref, 0, self._references[ref]) for ref in self._references.names ) if self._exclude_intervals: for ivl in interval_itr: overlapping = self._exclude_intervals.find(ivl) if overlapping: yield from GenomeInterval.divide(ivl, overlapping) else: yield ivl else: yield from interval_itr def intersect(self, intervals: Iterable[IVL]) -> Iterator[IVL]: """ Constrain intervals to the regions specified by this Regions object. Args: intervals: The intervals to constrain. Yields: Intervals that are in the intersection of `intervals` and this set of regions. """ for ivl in intervals: intersection = [] if self._include_intervals: incl_overlapping = self._include_intervals.find(ivl) if incl_overlapping: intersection = GenomeInterval.intersect(ivl, incl_overlapping) else: intersection = [ivl] if self._exclude_intervals: for subivl in intersection: excl_overlapping = self._exclude_intervals.find(subivl) if excl_overlapping: yield from GenomeInterval.divide(subivl, excl_overlapping) else: yield subivl else: yield from intersection PK!ѷ+xindextools/utils.pyimport os from pathlib import Path from typing import ( Dict, Generic, Iterable, Iterator, Sequence, Tuple, TypeVar, Union, cast, ) import pysam from xphyle.utils import read_delimited class FileFormatError(Exception): pass T = TypeVar("T") class OrderedSet(Generic[T]): """Simple set (does not implement the full set API) based on dict that maintains addition order. """ def __init__(self, items: Iterable[T] = None) -> None: self.items = {} if items: self.update(items) def add(self, item: T) -> None: self.items[item] = True def update(self, items: Iterable[T]) -> None: for item in items: self.items[item] = True def __iter__(self) -> Iterator[T]: return iter(self.items.keys()) class References: """Stores map of reference name to size, and provides access to reference_list in different formats. """ def __init__(self, references: Sequence[Tuple[str, int]]): self.reference_list = list(references) self._names = None self._lengths = None self._id_to_name = None self._name_to_size = None def __len__(self) -> int: return len(self.reference_list) def __contains__(self, ref: Union[str, int]): if isinstance(ref, int): name = self.id_to_name[cast(int, ref)] else: name = cast(str, ref) return name in self.name_to_size def __getitem__(self, item: Union[str, int]) -> int: return self.get_size(item) def __repr__(self) -> str: return f"References(" \ f"{','.join(f'{k}={str(v)}' for k, v in self.name_to_size.items())})" def get_size(self, ref: Union[str, int]) -> int: """Gets the size of a reference. Args: ref: A reference name or index. Returns: The size of the reference. """ if isinstance(ref, int): name = self.id_to_name[cast(int, ref)] else: name = cast(str, ref) return self.name_to_size[name] @property def names(self) -> Sequence[str]: """Sequence of reference names. """ if self._names is None: self._names = tuple(ref[0] for ref in self.reference_list) return self._names @property def lengths(self) -> Sequence[int]: """Sequence of reference lengths. """ if self._lengths is None: self._lengths = tuple(ref[1] for ref in self.reference_list) return self._lengths @property def name_to_size(self) -> Dict[str, int]: """Mapping of reference name to size. """ if self._name_to_size is None: self._name_to_size = dict(self.reference_list) return self._name_to_size @property def id_to_name(self) -> Dict[int, str]: """Mapping of reference index to reference name. """ if self._id_to_name is None: self._id_to_name = dict( (i, ref[0]) for i, ref in enumerate(self.reference_list) ) return self._id_to_name @staticmethod def from_file(path: Path) -> "References": """Loads references from a tsv file with two columns: ref_name, ref_size. Args: path: The path of the tsv file. Returns: A References object. """ return References(list(read_delimited(path, converters=[str, int]))) @staticmethod def from_bam(bam: Union[Path, pysam.AlignmentFile]) -> "References": """Loads references from the header of a BAM file. Args: bam: Either a path to a BAM file or an open pysam.AlignmentFile. Returns: A References object. """ def bam_to_references(_bam): return References(list(zip(_bam.references, _bam.lengths))) if isinstance(bam, Path): with pysam.AlignmentFile(str(bam), "rb") as bam_file: return bam_to_references(bam_file) else: return bam_to_references(bam) def split_path(path: Path) -> Tuple[Path, str, Sequence[str]]: """ Splits a path into parts. Args: path: The path to split Returns: Tuple of (basename, prefix, exts), where exts is a sequence of filename extensions. """ name = path.name prefix, suffix = name.split(os.extsep, 1) return path.parent, prefix, suffix.split(os.extsep) def replace_suffix(path: Path, new_suffix: str) -> Path: """ Replace the current suffix of `path` (everything from the first '.' on) with `new_suffix`. Args: path: new_suffix: Returns: """ parts = split_path(path) return parts[0] / (parts[1] + new_suffix) PK!H$>(<+indextools-0.1.1.dist-info/entry_points.txtN+I/N.,()KI()E0ʬB\\PK!Ο))"indextools-0.1.1.dist-info/LICENSEMIT License Copyright (c) 2019 DNAnexus Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HnHTU indextools-0.1.1.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H@I47#indextools-0.1.1.dist-info/METADATAN0 } AZvcUĐ2z&Iڷ'- Sߟ߾': _X* %i]+j)Ѵ)̡Z< :V%Bmo \hIK?9^`9)CnM z`BU ۢW/ ڊ\j6dUYoy6d09QpUfmWF-Qw.ˋS\x׍3b .+wp2Fb,$NXwpZjAqi?PK!HJK'!indextools-0.1.1.dist-info/RECORD}˲:y? ~@:A ?tUw]d++ h q P,᷊Ӗ^<:Y].p5{4[=7FLa$-udmN KTL9flì 6=w19؟Ny}-%wwM" F`]'MޯXű3KrOp'F δvQ ޯWJ a@M{-4tŚ+fo U<΃ v=2cQQcq֜bat"qW0ߏYYuW ݘT"IGoB{)4efɮ⦪[PR'(5Q _ C6=Gv%2ZRymn@|}_d1KS5Ak=K$@Y'>ϕᠹŸE9ngz?3lPۨ/-~JNګ S8 &+GOkjtIR*nJv6UE$8;:+r8837ϪsD]M/-ja ߲oH7~*8Q OM՛, *n1PK!indextools/__init__.pyPK!?!4indextools/bed.pyPK!7indextools/console/__init__.pyPK!y  sindextools/console/commands.pyPK!==&indextools/console/features.pyPK!;T-indextools/console/partition.pyPK!<~ =indextools/console/split.pyPK!E}TGGKindextools/index.pyPK!~p\\indextools/intervals.pyPK!Ecllindextools/regions.pyPK!ѷ+xindextools/utils.pyPK!H$>(<+"indextools-0.1.1.dist-info/entry_points.txtPK!Ο))"A#indextools-0.1.1.dist-info/LICENSEPK!HnHTU 'indextools-0.1.1.dist-info/WHEELPK!H@I47#<(indextools-0.1.1.dist-info/METADATAPK!HJK'!)indextools-0.1.1.dist-info/RECORDPK,