PK!+U00ncbi_taxid/__init__.py__version__ = '0.1.0' __all__ = ['taxid_utils'] PK!Srncbi_taxid/cli.pyimport csv from typing import Optional, TextIO import sys import click from .taxid_utils import TaxIDExpander @click.command(short_help='Annotate diamond output with taxonomy names') @click.option('--taxdump_filename', type=click.Path(exists=True, file_okay=True), help='Path to local copy of NCBI taxdump.tar.gz file') @click.option('--taxdb_filename', type=click.Path(), help='Name for the processed database, will be loaded if it exists') @click.option('--diamond_output_format', default='6 qseqid sseqid pident length mismatch gapopen qstart qend sstart send evalue bitscore slen qlen qcovhsp stitle staxids', help='Output format used by DIAMOND (most include staxids)') @click.option('--output_file', type=click.File('w'), default=sys.stdout, help='Output file to write output with expanded taxonomy information (TSV format)') @click.argument('diamond_output_file', type=click.File()) def annotate_diamond(diamond_output_file: TextIO, diamond_output_format: str, output_file: TextIO, taxdump_filename: Optional[str] = None, taxdb_filename: Optional[str] = None): annotater = TaxIDExpander(taxdb_filename=taxdb_filename, taxdump_filename=taxdump_filename) assert 'staxids' in diamond_output_format, "The DIAMOND output format must include the staxids column" taxid_column = diamond_output_file.split().index('staxids') - 1 # the column position, minus 1 to ignore '6' output = csv.writer(output_file, delimiter='\t') for row in csv.reader(diamond_output_file, delimiter='\t'): taxid = int(row[taxid_column]) lineage_info = annotater.get_lineage(taxid, only_standard_ranks=True) output_row = row + [taxon_info[0] for taxon_info in lineage_info] output.write_row(output_row) output.close() if __name__ == '__main__': annotate_diamond() # pylint: disable=E1120 PK!HV V ncbi_taxid/taxid_utils.pyfrom pathlib import Path from typing import Optional, List, Tuple from ete3 import NCBITaxa class TaxIDExpander(object): def __init__(self, taxdump_filename: str = None, taxdb_filename: str = None) -> 'TaxIDExpander': """Constructor for TaxIDExpander Args: taxdump_filename(str): if specified, refers to a local copy of the NCBI taxdump.tar.gz file taxdb_filename(str): if specified will be used to look for a db containing the NCBI database to load. if both taxdump_filename and taxdb_filename are set, save to taxdb_filename """ if taxdump_filename is not None: taxdump_path = Path(taxdump_filename) if not (taxdump_path.exists() and taxdump_path.is_file()): raise ValueError(f'{taxdump_filename} must be a readable file') if taxdb_filename is not None: # we have both a taxdump file and a taxdb file # this means we load from taxdump file and save to taxdb file self.ncbi = NCBITaxa(taxdump_file=taxdump_filename, dbfile=taxdb_filename) else: # we have a taxdump file and no taxdb file # this means we load from the taxdump file and let ete3 save to its default location self.ncbi = NCBITaxa(taxdump_file=taxdb_filename) else: if taxdb_filename is not None: # we have a taxdb file and no taxdump file # this means we load the database from the taxdb file taxdb_path = Path(taxdb_filename) if not (taxdb_path.exists() and taxdb_path.is_file()): raise ValueError(f'{taxdb_filename} must be a readable file') self.ncbi = NCBITaxa(dbfile=taxdb_filename) else: # we have neither a taxdump file nor a taxdb file # this means ete3 loads the database over the network (and cache in local directory) # and let ete3 save the taxdb to its default location self.ncbi = NCBITaxa() def get_lineage(self, taxid: str, only_standard_ranks: Optional[bool] = False) -> List[Tuple[str, str]]: """Return lineage for a given taxonomy ID Raises ValueError if taxonomy ID is not found. Args: taxid(str): NCBI taxonomy ID only_standard_ranks(bool): if True only return superkingdom, phylum, class, order, family, genus and species ranks Returns: list of tuples where the tuples have members (taxon name, taxon rank)""" lineage_ids = self.ncbi.get_lineage(taxid) names = self.ncbi.get_taxid_translator(lineage_ids) ranks = self.ncbi.get_rank(lineage_ids) standard_ranks = set(['superkingdom', 'phylum', 'class', 'order', 'family', 'genus', 'species']) lineage = [] for id in lineage_ids: rank = ranks[id] if only_standard_ranks and rank not in standard_ranks: continue lineage.append((names[id], ranks[id])) return lineage PK!HBH+ncbi_taxid-0.1.0.dist-info/entry_points.txtN+I/N.,()JLKOLI/IϭKN2Ss2KKR㡊PK!H\TT ncbi_taxid-0.1.0.dist-info/WHEEL 1 0 нR \I$ơ7.ZON `h6oi14m,b4>4ɛpK>X;baP>PK!HKJ#ncbi_taxid-0.1.0.dist-info/METADATAAO0 >2iZ6meb,t$m7{)b+prQ% lXFJt3L:k5ykINKP>K`C.nk%Ep@sBE#"7M@aF}x=<&,-M 5NRC-S?׷Gkհ@Ww)褟l}U˫َo{Qiؼl:Vpr"B姫 PK!H-ݴ)_!!ncbi_taxid-0.1.0.dist-info/RECORD}K0`sC`UqIa8MD_[[e'/}qʑWrFDB7?FĢaYYfYޢ,M؉?9l ]%y"\-[7xbyK.ErXS|%*~O9Y+P9|u~zRԧ-LgZ|йvLh?oӁ `T\*{iqFpkyc8rs4K]<06AV$f6KH;̄& Tw"+ϗ:͵^ >CNovM=U ]睰L,״?PK!+U00ncbi_taxid/__init__.pyPK!Srdncbi_taxid/cli.pyPK!HV V Rncbi_taxid/taxid_utils.pyPK!HBH+ncbi_taxid-0.1.0.dist-info/entry_points.txtPK!H\TT jncbi_taxid-0.1.0.dist-info/WHEELPK!HKJ#ncbi_taxid-0.1.0.dist-info/METADATAPK!H-ݴ)_!!!ncbi_taxid-0.1.0.dist-info/RECORDPK