PK 2JU
$ DeFCoM-1.0.1.data/scripts/predict.py#!python
from ConfigParser import SafeConfigParser
from defcom.alignment import Alignments
import defcom.motif_sites as ms
import numpy as np
import os
from collections import Counter
from defcom.features import Features
import numpy as np
import pickle
import sys
########
# TO DO: REFACTOR CODE
########
def main():
"""Run the classification phase of DeFCoM.
Args:
sys.argv[1] = A python configuration file. For configuration file
details see https://docs.python.org/2/library/configparser.html.
"""
#Load parameters from config file
config_file = sys.argv[1]
parser = SafeConfigParser(os.environ)
parser.read(config_file)
candidate_sites_file = parser.get('data', 'candidate_sites_file')
bam_file = parser.get('data', 'candidate_bam_file')
f_offset = parser.getint('options', 'f_offset')
r_offset = parser.getint('options', 'r_offset')
flank_size = parser.getint('options', 'flank_size')
combine_strands = parser.getboolean('options', 'combine_strands')
bias_correction = parser.getboolean('options', 'bias_correction')
model_data_file = parser.get('options', 'model_data_file')
results_file = parser.get('options', 'results_file')
#Create data objects
bam_data = Alignments(bam_file)
candidate_data = ms.MotifSites(candidate_sites_file)
model_data = pickle.load(open(model_data_file, 'rb'))
svm = model_data["svm"]
feature_scaler = model_data["feature_scaler"]
#Prepare data
candidate_sites = candidate_data.get_all_sites()
cut_profiles = Features.convert_to_cuts(candidate_sites, bam_data,
flank_size, combine_strands, f_offset, r_offset, bias_correction)
candidate_sites = candidate_data.get_all_sites()
pwm_scores = Features.get_pwm_scores(candidate_sites)
#Apply square root transformation
cut_profiles = np.sqrt(cut_profiles)
#Apply feature extraction
partition_fn = np.vectorize(lambda x:x**2 + 5)
split_pts = Features.get_splits(cut_profiles[0].size, partition_fn)
center = cut_profiles[0].size/2 #For regional feature center size
left = center - 15 #For regional feature center size
right = center + 15 #For regional feature center size
candidate_features = []
for i in xrange(cut_profiles.shape[0]):
row = cut_profiles[i]
#Get local features
row_split = np.split(row, split_pts)
local_slopes = Features.get_slopes(row_split)
local_means = np.vectorize(np.mean)(row_split)
#Get regional features
row_split = np.array([row[(left-30):left], row[left:right],
row[right:(right+30)]])
regional_slopes = Features.get_slopes(row_split)
regional_means = np.mean(row_split, axis=1)
global_mean = [np.mean(row[(center-75):(center+75)])]
row_features = np.concatenate((local_slopes, local_means,
regional_slopes, regional_means, global_mean, [pwm_scores[i]]))
candidate_features.append(row_features)
#Scale features
candidate_features = feature_scaler.transform(candidate_features)
#Predict and output to file
scores = svm.decision_function(candidate_features)
fout = open(results_file, 'w')
i = 0
candidate_sites = candidate_data.get_all_sites()
for site in candidate_sites:
updated_row = list(site) + [str(scores[i])]
fout.write("\t".join(updated_row) + "\n")
i += 1
fout.close()
if __name__ == '__main__':
sys.exit(main())
PK 2J) ) " DeFCoM-1.0.1.data/scripts/train.py#!python
from ConfigParser import SafeConfigParser
from defcom.alignment import Alignments
import defcom.motif_sites as ms
import numpy as np
import os
from collections import Counter
from defcom.features import Features
from sklearn import preprocessing, model_selection, svm, metrics
import defcom.performance_stats as performance_stats
from scipy import stats
import numpy as np
import pickle
import sys
########
# TO DO: REFACTOR CODE
########
def main():
"""Run the training phase of DeFCoM.
Args:
sys.argv[1] = A python configuration file. For configuration file
details see https://docs.python.org/2/library/configparser.html.
"""
#Load parameters from config file
config_file = sys.argv[1]
parser = SafeConfigParser(os.environ)
parser.read(config_file)
active_sites_file = parser.get('data', 'active_sites_file')
inactive_sites_file = parser.get('data', 'inactive_sites_file')
bam_file = parser.get('data', 'training_bam_file')
f_offset = parser.getint('options', 'f_offset')
r_offset = parser.getint('options', 'r_offset')
flank_size = parser.getint('options', 'flank_size')
combine_strands = parser.getboolean('options', 'combine_strands')
bias_correction = parser.getboolean('options', 'bias_correction')
bootstrap_iterations = parser.getint('options', 'bootstrap_iterations')
bootstrap_active_set_size = parser.getint('options',
'bootstrap_active_set_size')
bootstrap_inactive_set_size = parser.getint('options',
'bootstrap_inactive_set_size')
training_active_set_size = parser.getint('options',
'training_active_set_size')
training_inactive_set_size = parser.getint('options',
'training_inactive_set_size')
memory = parser.getint('options', 'memory_limit')
model_data_file = parser.get('options', 'model_data_file')
print "making data"
#Create data objects
bam_data = Alignments(bam_file)
active_sites = ms.MotifSites(active_sites_file)
inactive_sites = ms.MotifSites(inactive_sites_file)
print "begin bootstrap"
#Execute parameter estimation via bootstrapping
gammas = []
c_rbf = []
c_linear = []
rbf_pauc = []
lin_pauc = []
active_bootstrap = active_sites.subsample_sites(
bootstrap_active_set_size, bootstrap_iterations)
inactive_bootstrap = active_sites.subsample_sites(
bootstrap_inactive_set_size, bootstrap_iterations)
for i in range(bootstrap_iterations):
temp_active = active_bootstrap[i]
temp_inactive = inactive_bootstrap[i]
print "to cuts"
active_cuts = Features.convert_to_cuts(temp_active, bam_data,
flank_size, combine_strands, f_offset, r_offset, bias_correction)
inactive_cuts = Features.convert_to_cuts(temp_inactive, bam_data,
flank_size, combine_strands, f_offset, r_offset, bias_correction)
active_pwm = Features.get_pwm_scores(temp_active)
inactive_pwm = Features.get_pwm_scores(temp_inactive)
print "sqrt transform"
#Apply square root transformation
active_cuts = np.sqrt(active_cuts)
inactive_cuts = np.sqrt(inactive_cuts)
#Apply feature extraction
partition_fn = np.vectorize(lambda x:x**2 + 5)
split_pts = Features.get_splits(active_cuts[0].size, partition_fn)
center = active_cuts[0].size/2 #For regional feature center size
left = center - 15 #For regional feature center size
right = center + 15 #For regional feature center size
active_features = []
inactive_features = []
print "feature extraction"
for i in xrange(active_cuts.shape[0]):
row = active_cuts[i]
#Get local features
row_split = np.split(row, split_pts)
local_slopes = Features.get_slopes(row_split)
local_means = np.vectorize(np.mean)(row_split)
#Get regional features
row_split = np.array([row[(left-30):left], row[left:right],
row[right:(right+30)]])
regional_slopes = Features.get_slopes(row_split)
regional_means = np.mean(row_split, axis=1)
global_mean = [np.mean(row[(center-75):(center+75)])]
row_features = np.concatenate((local_slopes, local_means,
regional_slopes, regional_means, global_mean, [active_pwm[i]]))
active_features.append(row_features)
for i in xrange(inactive_cuts.shape[0]):
row = inactive_cuts[i]
#Get local features
row_split = np.split(row, split_pts)
local_slopes = Features.get_slopes(row_split)
local_means = np.vectorize(np.mean)(row_split)
#Get regional features
row_split = np.array([row[(left-30):left], row[left:right],
row[right:(right+30)]])
regional_slopes = Features.get_slopes(row_split)
regional_means = np.mean(row_split, axis=1)
global_mean = [np.mean(row[(center-75):(center+75)])]
row_features = np.concatenate((local_slopes, local_means,
regional_slopes, regional_means, global_mean,
[inactive_pwm[i]]))
inactive_features.append(row_features)
combined_features = np.concatenate((active_features,
inactive_features), axis=0)
feature_scaler = preprocessing.MaxAbsScaler()
training_set = feature_scaler.fit_transform(combined_features)
active_labels = [1] * len(active_features)
inactive_labels = [-1] * len(inactive_features)
training_labels = active_labels + inactive_labels
print "cross-validation"
#Perform cross-validation
gamma_range = np.logspace(-6,3,10)
c_range = np.logspace(-3,4,8)
parameters = {'C':c_range, 'gamma':gamma_range}
pauc_scorer = metrics.make_scorer(performance_stats.pAUC,
needs_threshold=True, fpr_cutoff=0.05)
#RBF kernel SVM
gridCV = model_selection.GridSearchCV(
svm.SVC(cache_size=memory, kernel="rbf"),
parameters, n_jobs=3, pre_dispatch=3, cv=5,
scoring=pauc_scorer, refit=False)
gridCV.fit(training_set, training_labels)
c_rbf.append(gridCV.best_params_["C"])
gammas.append(gridCV.best_params_["gamma"])
rbf_pauc.append(gridCV.best_score_)
#Linear SVM
parameters = {'C':c_range}
gridCV = model_selection.GridSearchCV(svm.SVC(kernel="linear"),
parameters, n_jobs=3, pre_dispatch=3, cv=5,
scoring=pauc_scorer, refit=False)
gridCV.fit(training_set, training_labels)
c_linear.append(gridCV.best_params_["C"])
lin_pauc.append(gridCV.best_score_)
print "t-test"
#Use t-test to determine better model
pval = stats.ttest_ind(lin_pauc, rbf_pauc)[1]
if pval > 0.01:
best_c = Counter(c_linear).most_common(1)[0][0]
final_model = svm.SVC(kernel="linear", C=best_c)
elif np.mean(lin_pauc) > np.mean(rbf_pauc):
best_c = Counter(c_linear).most_common(1)[0][0]
final_model = svm.SVC(kernel="linear", C=best_c)
else:
best_c = Counter(c_rbf).most_common(1)[0][0]
best_gamma = Counter(gammas).most_common(1)[0][0]
final_model = svm.SVC(kernel="linear", C=best_c, gamma=best_gamma)
print "final model training"
#Train final model
active_set = active_sites.subsample_sites(training_active_set_size)[0]
inactive_set = inactive_sites.subsample_sites(training_inactive_set_size)[0]
active_cuts = Features.convert_to_cuts(active_set, bam_data, flank_size,
combine_strands, f_offset, r_offset, bias_correction)
inactive_cuts = Features.convert_to_cuts(inactive_set, bam_data,
flank_size, combine_strands, f_offset, r_offset, bias_correction)
active_pwm = Features.get_pwm_scores(active_set)
inactive_pwm = Features.get_pwm_scores(inactive_set)
#Apply square root transformation
active_cuts = np.sqrt(active_cuts)
inactive_cuts = np.sqrt(inactive_cuts)
print "feature extraction"
#Apply feature extraction
partition_fn = np.vectorize(lambda x:x**2 + 5)
split_pts = Features.get_splits(active_cuts[0].size, partition_fn)
center = active_cuts[0].size/2 #For regional feature center size
left = center - 15 #For regional feature center size
right = center + 15 #For regional feature center size
active_features = []
inactive_features = []
for i in xrange(active_cuts.shape[0]):
row = active_cuts[i]
#Get local features
row_split = np.split(row, split_pts)
local_slopes = Features.get_slopes(row_split)
local_means = np.vectorize(np.mean)(row_split)
#Get regional features
row_split = np.array([row[(left-30):left], row[left:right],
row[right:(right+30)]])
regional_slopes = Features.get_slopes(row_split)
regional_means = np.mean(row_split, axis=1)
global_mean = [np.mean(row[(center-75):(center+75)])]
row_features = np.concatenate((local_slopes, local_means,
regional_slopes, regional_means, global_mean, [active_pwm[i]]))
active_features.append(row_features)
for i in xrange(inactive_cuts.shape[0]):
row = inactive_cuts[i]
#Get local features
row_split = np.split(row, split_pts)
local_slopes = Features.get_slopes(row_split)
local_means = np.vectorize(np.mean)(row_split)
#Get regional features
row_split = np.array([row[(left-30):left], row[left:right],
row[right:(right+30)]])
regional_slopes = Features.get_slopes(row_split)
regional_means = np.mean(row_split, axis=1)
global_mean = [np.mean(row[(center-75):(center+75)])]
row_features = np.concatenate((local_slopes, local_means,
regional_slopes, regional_means, global_mean, [inactive_pwm[i]]))
inactive_features.append(row_features)
combined_features = np.concatenate((active_features,
inactive_features), axis=0)
feature_scaler = preprocessing.MaxAbsScaler()
training_set = feature_scaler.fit_transform(combined_features)
active_labels = [1] * len(active_features)
inactive_labels = [-1] * len(inactive_features)
training_labels = active_labels + inactive_labels
print "final model fit"
final_model.fit(training_set, training_labels)
model_data = {"svm": final_model, "feature_scaler": feature_scaler}
print "pickling"
pickle.dump(model_data, open(model_data_file, 'wb'))
if __name__ == '__main__':
sys.exit(main())
PK ˤ0Jgr# # defcom/motif_sites.py#---------------------------------------------------------------------
# DeFCoM: A supervised learning genomic footprinter
# Copyright (C) 2016 Bryan Quach
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#----------------------------------------------------------------------
from bisect import bisect_left
from os import path
import pysam
import random
class MotifSites(object):
"""Access and process pseudo-BED annotations for motif sites.
Provides functionality for working with motif predictions. Assumes that
coordinates are 0-based [start, end).
Attributes:
_motif_sites: A pysam Tabixfile object.
filename: The filename associated with the Tabixfile object.
num_sites: Number of motif site annotations in the given file
Args:
motif_file: A string specifying the path and filename of the
pseudo-BED file containing motif sites. The file must be gzipped
and have the '.gz' file extension. It should also have an index
file with the same name but with the '.tbi' extension appended.
For example, '/path/file.bed' should be gzipped to
'/path/file.bed.gz' and have an index file called
'/path/file.bed.gz.tbi'.
Raises:
IOError: File could not be found or opened.
IOError: File index could not be found.
IOError: Gzipped version of file could not be found.
"""
def __init__(self, motif_file):
"""Initialize MotifSites class."""
self._motif_sites = self.__load_motif_sites(motif_file)
self.filename = motif_file
self.num_sites = len(tuple(
self._motif_sites.fetch(multiple_iterators=True)))
def __del__(self):
"""Deconstructor for MotifSites.
Closes the Tabixfile object in '_motif_sites'.
"""
try:
self._motif_sites.close()
except:
pass
def __load_motif_sites(self, filename):
"""Load the pseudo-BED file data into a Tabixfile object.
Args:
filename: A string for the psuedo-BED file. The path should be
prepended if the file is located outside the working directory.
Returns:
A pysam Tabixfile object.
Raises:
IOError: File could not be found or opened.
IOError: File index could not be found.
IOError: Gzipped version of file could not be found.
"""
print "Loading BED file %s..." % path.basename(filename)
if not path.isfile(filename):
error_msg = "%s could not be found." % filename
raise IOError(error_msg)
if not path.isfile(filename + ".gz"):
error_msg = "%s could not be found." % (filename + ".gz")
raise IOError(error_msg)
if not path.isfile(filename + ".gz.tbi"):
error_msg = "Index file %s not found." % (filename + ".gz.tbi")
raise IOError(error_msg)
tabixfile = pysam.Tabixfile(filename + ".gz", "r")
return tabixfile
def get_all_sites(self):
"""Retrieve all motif sites in the MotifSites object.
Retrieves all the motif sites for the MotifSites object and stores
them in an iterator.
Returns:
A pysam iterator containing motif site data.
"""
return self._motif_sites.fetch(parser=pysam.asTuple(),
multiple_iterators=True)
def get_chr_sites(self, chrom, multi_iter=False):
"""Retrieve all motif sites on a specified chromosome.
Retrieves all the motif sites in the MotifSites object from a
specified chromosome and stores them in an iterator.
Args:
chrom: A string specifying the chromosome of interest. Must follow
the naming convention specified by the pseudo-BED file.
multi_iter: A boolean indicating whether to enable multipe
iterators. This should be 'True' if multiple calls are made to
this method concurrently for the same MotifSites object.
Returns:
A pysam iterator that contains motif site data.
Raises:
ValueError - The genomic coordinates are out of range or invalid.
"""
return self._motif_sites.fetch(reference=chrom,
parser=pysam.asTuple(), multiple_iterators=multi_iter)
def get_sites(self, chrom, start, end, multi_iter=False):
"""Retrieve motif sites overlapping a genomic interval.
Retrieves the motif sites in the MotifSites object that overlap
with a specified genomic interval. Does not require complete overlap.
Args:
chrom: A string specifying the chromosome of interest. Must follow
the naming convention specified by the pseudo-BED file.
start: An int for the 0-based start position of the genomic region.
end: An int for the 0-based end position of the genomic region. The
position 'end' denotes one position past the region of interest.
multi_iter: A boolean indicating whether to enable multipe
iterators. This should be 'True' if multiple calls are made to
this method concurrently for the same MotifSites object.
Returns:
A pysam iterator that contains motif site data.
Raises:
ValueError - The genomic coordinates are out of range or invalid.
"""
return self._motif_sites.fetch(chrom, start, end,
parser=pysam.asTuple(), multiple_iterators=multi_iter)
def subsample_sites(self, n_samples, iterations=1):
"""Choose a random subset of motif sites.
Chooses a random subset of motif sites assuming that n_samples is less
than the total number of sites. If n_samples is greater than or equal
to the total number of sites, then the whole set is returned.
Args:
n_samples: The number of sites to subsample.
Returns:
A 2D list containing pseudo-BED format data for the sites selected.
"""
########
# TO DO: APPLY PARALLEL PROCESSING FOR OBTAINING SAMPLES
########
def binary_search(a, x, lo=0, hi=None):
"""Binary search algorithm."""
hi = hi if hi is not None else len(a)
pos = bisect_left(a,x,lo,hi)
return (True if pos != hi and a[pos] == x else False)
counter = 0
site_list = [[] for i in xrange(iterations)]
indices = range(self.num_sites)
#Apply random sampling if needed
sample_idx = []
for i in xrange(iterations):
if self.num_sites <= n_samples:
n_samples = self.num_sites
subset = indices
else:
subset = random.sample(indices, n_samples)
subset.sort()
sample_idx.append(subset)
motif_sites = self._motif_sites.fetch(parser=pysam.asTuple(),
multiple_iterators=True)
for site in motif_sites:
for i in xrange(iterations):
idx = sample_idx[i]
if binary_search(idx, counter, hi=n_samples):
site_list[i].append(site)
counter += 1
return site_list
PK ˤ0J#w$ w$ defcom/features.py#---------------------------------------------------------------------
# DeFCoM: A supervised learning genomic footprinter
# Copyright (C) 2016 Bryan Quach
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#----------------------------------------------------------------------
from types import *
import numpy as np
import math
class Features(object):
"""Functions for applying feature extraction."""
@classmethod
def get_splits(self, length, partition_fn):
"""Get indices for dividing a vector into multiple parts.
Gets the indices for partitioning a vector into multiple subvectors
based on a supplied partition function. If the partition function does
not evenly divide the vector then the segments furthest from the vector
center will include the remainder. This function is designed to easily
create partitions whose sizes are approximately symmetric about the
vector center but may vary in length otherwise. The list of indices
matches the format required as an argument for numpy.split().
Args:
length: An int specifying the size of the vector to partition.
partition_fn: A vectorized callable that accepts and returns int
types. A passed int value denotes the distance from the center
of the vector in partition function distance. A returned int
should specify the size of the partition at the provided
partition distance. We define a partition distance unit to be
the number of partitions away from a specified point where 0
denotes an initial partition that borders a specified point.
Returns:
An array containing the indices at which a vector of the size
specified would be split based on the given partition function.
Raises:
AssertionError: The length supplied is not positive.
TypeError: Partition function is not numpy vectorized.
TypeError: The 'length' argument is not the correct type.
ValueError: Non-positive value returned by partition function.
"""
if type(length) is not IntType:
error_msg = "Vector supplied must be an array-like or numpy array"
raise TypeError(error_msg)
assert length > 0, "Length value is not positive"
if type(partition_fn) is not np.lib.function_base.vectorize:
error_msg = "Partition function is not vectorized"
raise TypeError(error_msg)
#Get the number of partitions needed to span the vector
pdist = 0
error_msg = "Non-positive value returned by partition function"
if partition_fn(pdist) < 1: raise ValueError(error_msg)
windows_sum = partition_fn(pdist) * 2
while windows_sum <= length:
pdist += 1
if partition_fn(pdist) < 1: raise ValueError(error_msg)
windows_sum = partition_fn(np.arange(pdist + 1)).sum() * 2
#Get the window sizes to use
right_windows = partition_fn(np.arange(pdist))
left_windows = partition_fn(np.arange(pdist - 1, -1, -1))
#Update the first window size to include the remainder
remainder = length - left_windows.sum() * 2
left_remainder = int(math.floor(remainder / 2.0))
left_windows[0] += left_remainder
#Make partitions
left_splits = [left_windows[:(i+1)].sum()
for i in range(left_windows.size)]
center = left_windows.sum()
right_splits = [right_windows[:(i+1)].sum() + center
for i in range(right_windows.size-1)]
splits = np.append(left_splits, right_splits)
return splits
@classmethod
def slope(self, x, y):
"""Calculate a slope.
Args:
x = An array-like of x-coordinates.
y = An array-like of y-coordinates.
Returns:
A numpy float64 value for the slope.
Raises:
AssertError: Insufficient number of coordinates given.
ValueError: x and y coordinate vectors not equal in length.
"""
assert len(y) > 1, "Insufficient number of coordinates given."
if type(x) is not np.ndarray:
x = np.array(x)
if type(y) is not np.ndarray:
y = np.array(y)
numerator = (x * y).mean() - x.mean() * y.mean()
denominator = (x**2).mean() - x.mean()**2
return numerator/denominator
@classmethod
def get_slopes(self, vector):
"""Extract slope values for segments of a given vector.
Computes the slope assuming that the segments of the given vector
represent y-coordinate values and x-coordinate values are spaced
1 unit apart.
Args:
vector: An array-like of numpy arrays with numeric values.
Returns:
A numpy array with slope values for each segment in the given
vector.
Raises:
AttributeError: Segments of vector are not numpy arrays.
"""
get_x = lambda x: np.arange(x.size)
x_vals = [get_x(y) for y in vector]
return np.array([self.slope(x,y) for x,y in zip(x_vals, vector)])
@classmethod
def get_pwm_scores(self, motif_sites, score_column=4):
"""Retrieve PWM scores given a list of motif_sites.
Gets PWM scores from a 2D list of motif site data in Pseudo-BED format
and stores them into a list.
Args:
motif_sites: A 2D list or iterator of motif sites in
Pseudo-BED format.
score_column: The 2nd dimension index in "motif_sites" that
specifies the position in the list that contains the PWM score.
By default it is assumed that this is position 4.
"""
scores = []
for site in motif_sites:
scores.append(float(site[score_column]))
return scores
@classmethod
def convert_to_cuts(self, motif_sites, aln_data, flank_size,
combine_strands=True, f_offset=0, r_offset=0, use_weights=False):
"""Convert motif sites into a matrix of DNaseI digestion site counts.
Retrieves a vector of DNaseI digestion sites for each motif site in the
'motif_sites' list. Puts all the vectors into a 2D list.
Args:
motif_sites: A 2D list or iterator of motif sites in
Pseudo-BED format.
aln_data: An Alignment object.
flank_size: An int specifying how many bases upstream and
downstream of the motif site center to include. The value
specifies one flank. The total included bases is flank_size*2.
combine_strands: A boolean indicating if DNaseI digestion site
counts should be aggregated across strands. If 'False' then
the forward and reverse strand digestion count vectors are
concatenated with the forward strand being first.
f_offset: An int denoting the offset downstream from the 5' end of
a forward (+) strand read. Equivalent to read shifting.
r_offset: An int denoting the offset upstream from the 5' end of a
reverse (-) strand read. Equivalent to read shifting.
use_weights: A boolean indicating whether bias correction weights
should be used for each read.
Returns:
A 2D list where each inner list contains DNaseI digestion counts
per base within and/or around a motif site. Each index of the first
dimension corresponds to a motif site.
Raises:
ValueError - The genomic coordinates are out of range or invalid.
"""
cut_matrix = []
for motif_site in motif_sites:
chrom = motif_site[0]
start = int(motif_site[1])
end = int(motif_site[2])
midpoint = (start+end)/2
#Extend region by flank size
start = midpoint - flank_size
end = midpoint + flank_size
#Get cut sites
if combine_strands:
cut_sites = aln_data.get_cut_sites(chrom, start, end,
strand="combined", f_offset=f_offset, r_offset=r_offset,
use_weights=use_weights, multi_iter=False)
else:
cut_sites = aln_data.get_cut_sites(chrom, start, end,
strand=None, f_offset=f_offset, r_offset=r_offset,
use_weights=use_weights, multi_iter=False)
cut_matrix.append(cut_sites)
return cut_matrix
PK 2J?x"S S defcom/performance_stats.py#---------------------------------------------------------------------
# DeFCoM: A supervised learning genomic footprinter
# Copyright (C) 2016 Bryan Quach
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#----------------------------------------------------------------------
import uuid
import subprocess
from numpy import concatenate
from sklearn.metrics import roc_curve, auc
"""Methods for computing performance statistics."""
def pAUC(y_true, y_score, fpr_cutoff):
""" Calculate partial Area Under ROC (pAUC).
Computes a pAUC value given a specified false positive rate (FPR)
cutoff. It is important to note that the exact pAUC cannot be computed.
The accuracy of the calculation depends on the resolution of data
points produced by an intermediary ROC curve. The FPR data point
closest to and greater than the cutoff specified will be used for
interpolation to determine the pAUC at the specified FPR cutoff. For these
FPR values, the highest associated TPR values are used.
Args:
y_true: Array-like of true binary class labels in range {0, 1} or
{-1, 1} corresponding to y_score. The larger value represents
the positive class.
y_score: Array-like of target scores with higher scores indicating
more confidence in the positive class.
fpr_cutoff: A float specifying the FPR cutoff to use in computing
the pAUC. Must be in the interval (0,1).
Returns:
A float representing the pAUC value.
Raises:
AssertionError: The FPR cutoff is not in the interval (0,1)
"""
error_msg = "FPR cutoff must be in (0,1)"
assert fpr_cutoff > 0.0 and fpr_cutoff < 1.0
fpr, tpr, trash = roc_curve(y_true, y_score, drop_intermediate=False)
index_low = len([1 for i in fpr if i < fpr_cutoff])-1
index_high = index_low + 1
#Get interpolated TPR value from FPR cutoff
if index_low == -1: #No ROC data points lower than cutoff
x0 = fpr[0]
try:
x1 = min([xv for (c,xv) in zip([x>x0 for x in fpr],fpr) if c])
except ValueError:
x1 = x0
y0 = max([yv for (c,xv,yv) in zip([x==x0 for x in fpr],fpr,tpr) if c])
y1 = max([yv for (c,xv,yv) in zip([x==x1 for x in fpr],fpr,tpr) if c])
#Apply line derived from two closest points from FPR cutoff
tpr_cutoff = fpr_cutoff*((y1-y0)/(x1-x0)) + ((x1*y0-x0*y1)/(x1-x0))
#Segment full ROC to get partial ROC
fpr = [0.0] + [fpr_cutoff]
tpr = [0.0] + [tpr_cutoff]
elif index_high == len(fpr): #No ROC data points higher than cutoff
try:
x0 = max([xv for (c,xv) in zip([x