PKvnTN;scvelo/__init__.py"""scvelo - stochastic single cell RNA velocity""" from .get_version import get_version __version__ = get_version(__file__) del get_version from .read_load import AnnData, read, read_loom, load, read_csv from .preprocessing.neighbors import Neighbors from .tools.run import run_all from .tools.velocity import Velocity from .tools.velocity_graph import VelocityGraph from . import pp from . import tl from . import pl from . import datasets from . import utils from . import logging from . import settings PKaeNRscvelo/datasets.py"""Builtin Datasets. """ from .read_load import read, load from .preprocessing.utils import cleanup from anndata import AnnData import numpy as np import pandas as pd def toy_data(n_obs): """ Randomly samples from the Dentate Gyrus dataset. Arguments --------- n_obs: `int` Size of the sampled dataset Returns ------- Returns `adata` object """ """Random samples from Dentate Gyrus. """ adata = dentategyrus() indices = np.random.choice(adata.n_obs, n_obs) adata = adata[indices] adata.obs_names = np.array(range(adata.n_obs), dtype='str') adata.var_names_make_unique() return adata def dentategyrus(adjusted=True): """Dentate Gyrus dataset from Hochgerner et al. (2018). Dentate gyrus is part of the hippocampus involved in learning, episodic memory formation and spatial coding. It is measured using 10X Genomics Chromium and described in Hochgerner et al. (2018). The data consists of 25,919 genes across 3,396 cells and provides several interesting characteristics. Returns ------- Returns `adata` object """ if adjusted: filename = 'data/DentateGyrus/10X43_1.h5ad' url = 'https://github.com/theislab/scvelo_notebooks/raw/master/data/DentateGyrus/10X43_1.h5ad' adata = read(filename, backup_url=url, sparse=True, cache=True) else: filename = 'data/DentateGyrus/10X43_1.loom' url = 'http://pklab.med.harvard.edu/velocyto/DG1/10X43_1.loom' adata = read(filename, backup_url=url, cleanup=True, sparse=True, cache=True) cleanup(adata, clean='all', keep={'spliced', 'unspliced', 'ambiguous'}) url_louvain = 'https://github.com/theislab/scvelo_notebooks/raw/master/data/DentateGyrus/DG_clusters.npy' url_umap = 'https://github.com/theislab/scvelo_notebooks/raw/master/data/DentateGyrus/DG_umap.npy' adata.obs['clusters'] = load('./data/DentateGyrus/DG_clusters.npy', url_louvain) adata.obsm['X_umap'] = load('./data/DentateGyrus/DG_umap.npy', url_umap) adata.obs['clusters'] = pd.Categorical(adata.obs['clusters']) return adata def forebrain(): """Developing human forebrain. Forebrain tissue of a week 10 embryo, focusing on the glutamatergic neuronal lineage. Returns ------- Returns `adata` object """ filename = 'data/ForebrainGlut/hgForebrainGlut.loom' url = 'http://pklab.med.harvard.edu/velocyto/hgForebrainGlut/hgForebrainGlut.loom' adata = read(filename, backup_url=url, cleanup=True, sparse=True, cache=True) adata.var_names_make_unique() return adata def simulation(n_obs=300, n_vars=None, alpha=None, beta=None, gamma=None, alpha_=None, t_max=None, noise_model='normal', noise_level=1, switches=None): """Simulation of mRNA metabolism with transcription, splicing and degradation Returns ------- Returns `adata` object """ from .tools.dynamical_model_utils import unspliced, spliced, vectorize def draw_poisson(n): from random import uniform # draw from poisson t = np.cumsum([- .1 * np.log(uniform(0, 1)) for _ in range(n - 1)]) return np.insert(t, 0, 0) # prepend t0=0 def simulate_dynamics(tau, alpha, beta, gamma, u0, s0, noise_model, noise_level): ut = unspliced(tau, u0, alpha, beta) st = spliced(tau, s0, u0, alpha, beta, gamma) if noise_model is 'normal': # add noise ut += np.random.normal(scale=noise_level * np.percentile(ut, 99) / 10, size=len(ut)) st += np.random.normal(scale=noise_level * np.percentile(st, 99) / 10, size=len(st)) ut, st = np.clip(ut, 0, None), np.clip(st, 0, None) return ut, st def simulate_gillespie(alpha, beta, gamma): # update rules: transcription (u+1,s), splicing (u-1,s+1), degradation (u,s-1), nothing (u,s) update_rule = np.array([[1, 0], [-1, 1], [0, -1], [0, 0]]) def update(props): if np.sum(props) > 0: props /= np.sum(props) p_cumsum = props.cumsum() p = np.random.rand() i = 0 while p > p_cumsum[i]: i += 1 return update_rule[i] u, s = np.zeros(len(alpha)), np.zeros(len(alpha)) for i, alpha_i in enumerate(alpha): u_, s_ = (u[i - 1], s[i - 1]) if i > 0 else (0, 0) du, ds = update(props=np.array([alpha_i, beta * u_, gamma * s_])) u[i], s[i] = (u_ + du, s_ + ds) return u, s alpha = 5 if alpha is None else alpha beta = .5 if beta is None else beta gamma = .3 if gamma is None else gamma alpha_ = 0 if alpha_ is None else alpha_ t = draw_poisson(n_obs) if t_max is not None: t *= t_max / np.max(t) t_max = np.max(t) def cycle(array, n_vars=None): if isinstance(array, (list, tuple)): return array if n_vars is None else array * int(np.ceil(n_vars / len(array))) else: return [array] if n_vars is None else [array] * n_vars # switching time point obtained as fraction of t_max rounded down switches = cycle([.4, .7, 1, .1], n_vars) if switches is None else cycle(switches, n_vars) t_ = np.array([np.max(t[t < t_i * t_max]) for t_i in switches]) noise_level = cycle(noise_level, len(switches) if n_vars is None else n_vars) n_vars = max(len(switches), len(noise_level)) U = np.zeros(shape=(len(t), n_vars)) S = np.zeros(shape=(len(t), n_vars)) for i in range(n_vars): alpha_i = alpha[i] if isinstance(alpha, (tuple, list, np.ndarray)) else alpha beta_i = beta[i] if isinstance(beta, (tuple, list, np.ndarray)) else beta gamma_i = gamma[i] if isinstance(gamma, (tuple, list, np.ndarray)) else gamma tau, alpha_vec, u0_vec, s0_vec = vectorize(t, t_[i], alpha_i, beta_i, gamma_i, alpha_=alpha_, u0=0, s0=0) beta_i if noise_model is 'gillespie': U[:, i], S[:, i] = simulate_gillespie(alpha_vec, beta, gamma) else: U[:, i], S[:, i] = simulate_dynamics(tau, alpha_vec, beta_i, gamma_i, u0_vec, s0_vec, noise_model, noise_level[i]) obs = {'true_t': t.round(2)} var = {'true_t_': t_[:n_vars], 'true_alpha': np.ones(n_vars) * alpha, 'true_beta': np.ones(n_vars) * beta, 'true_gamma': np.ones(n_vars) * gamma, 'true_scaling': np.ones(n_vars)} layers = {'unspliced': U, 'spliced': S} return AnnData(S, obs, var, layers=layers) PK tNWscvelo/get_version.py""" A minimalistic version helper in the spirit of versioneer, that is able to run without build step using pkg_resources. Developed by P Angerer, see https://github.com/flying-sheep/get_version. """ import re import os from pathlib import Path from subprocess import run, PIPE, CalledProcessError from typing import NamedTuple, List, Union, Optional RE_VERSION = r"([\d.]+?)(?:\.dev(\d+))?(?:[_+-]([0-9a-zA-Z.]+))?" RE_GIT_DESCRIBE = r"v?(?:([\d.]+)-(\d+)-g)?([0-9a-f]{7})(-dirty)?" ON_RTD = os.environ.get("READTHEDOCS") == "True" def match_groups(regex, target): match = re.match(regex, target) if match is None: raise re.error(f"Regex does not match “{target}”. RE Pattern: {regex}", regex) return match.groups() class Version(NamedTuple): release: str dev: Optional[str] labels: List[str] @staticmethod def parse(ver): release, dev, labels = match_groups(f"{RE_VERSION}$", ver) return Version(release, dev, labels.split(".") if labels else []) def __str__(self): release = self.release if self.release else "0.0" dev = f".dev{self.dev}" if self.dev else "" labels = f'+{".".join(self.labels)}' if self.labels else "" return f"{release}{dev}{labels}" def get_version_from_dirname(name, parent): """Extracted sdist""" parent = parent.resolve() re_dirname = re.compile(f"{name}-{RE_VERSION}$") if not re_dirname.match(parent.name): return None return Version.parse(parent.name[len(name) + 1 :]) def get_version_from_git(parent): parent = parent.resolve() try: p = run(["git", "rev-parse", "--show-toplevel"], cwd=str(parent), stdout=PIPE, stderr=PIPE, encoding="utf-8", check=True) except (OSError, CalledProcessError): return None if Path(p.stdout.rstrip("\r\n")).resolve() != parent.resolve(): return None p = run(["git", "describe", "--tags", "--dirty", "--always", "--long", "--match", "v[0-9]*"], cwd=str(parent), stdout=PIPE, stderr=PIPE, encoding="utf-8", check=True) release, dev, hex_, dirty = match_groups( f"{RE_GIT_DESCRIBE}$", p.stdout.rstrip("\r\n") ) labels = [] if dev == "0": dev = None else: labels.append(hex_) if dirty and not ON_RTD: labels.append("dirty") return Version(release, dev, labels) def get_version_from_metadata(name: str, parent: Optional[Path] = None): try: from pkg_resources import get_distribution, DistributionNotFound except ImportError: return None try: pkg = get_distribution(name) except DistributionNotFound: return None # For an installed package, the parent is the install location path_pkg = Path(pkg.location).resolve() if parent is not None and path_pkg != parent.resolve(): msg = f"""\ metadata: Failed; distribution and package paths do not match: {path_pkg} != {parent.resolve()}\ """ return None return Version.parse(pkg.version) def get_version(package: Union[Path, str]) -> str: """Get the version of a package or module Pass a module path or package name. The former is recommended, since it also works for not yet installed packages. Supports getting the version from #. The directory name (as created by ``setup.py sdist``) #. The output of ``git describe`` #. The package metadata of an installed package (This is the only possibility when passing a name) Args: package: package name or module path (``…/module.py`` or ``…/module/__init__.py``) """ path = Path(package) if not path.suffix and len(path.parts) == 1: # Is probably not a path v = get_version_from_metadata(package) if v: return str(v) if path.suffix != ".py": msg = f"“package” is neither the name of an installed module nor the path to a .py file." if path.suffix: msg += f" Unknown file suffix {path.suffix}" raise ValueError(msg) if path.name == "__init__.py": name = path.parent.name parent = path.parent.parent else: name = path.with_suffix("").name parent = path.parent return str( get_version_from_dirname(name, parent) or get_version_from_git(parent) or get_version_from_metadata(name, parent) or "0.0.0" ) __version__ = get_version(__file__) if __name__ == "__main__": print(__version__) PK:nTNcscvelo/logging.py"""Logging and Profiling """ from . import settings from datetime import datetime from time import time as get_time from platform import python_version _VERBOSITY_LEVELS_FROM_STRINGS = {'error': 0, 'warn': 1, 'info': 2, 'hint': 3} def info(*args, **kwargs): return msg(*args, v='info', **kwargs) def error(*args, **kwargs): args = ('Error:',) + args return msg(*args, v='error', **kwargs) def warn(*args, **kwargs): args = ('WARNING:',) + args return msg(*args, v='warn', **kwargs) def hint(*args, **kwargs): return msg(*args, v='hint', **kwargs) def _settings_verbosity_greater_or_equal_than(v): if isinstance(settings.verbosity, str): settings_v = _VERBOSITY_LEVELS_FROM_STRINGS[settings.verbosity] else: settings_v = settings.verbosity return settings_v >= v def msg(*msg, v=4, time=False, memory=False, reset=False, end='\n', no_indent=False, t=None, m=None, r=None): """Write message to logging output. Log output defaults to standard output but can be set to a file by setting `sc.settings.log_file = 'mylogfile.txt'`. v : {'error', 'warn', 'info', 'hint'} or int, (default: 4) 0/'error', 1/'warn', 2/'info', 3/'hint', 4, 5, 6... time, t : bool, optional (default: False) Print timing information; restart the clock. memory, m : bool, optional (default: Faulse) Print memory information. reset, r : bool, optional (default: False) Reset timing and memory measurement. Is automatically reset when passing one of ``time`` or ``memory``. end : str (default: '\n') Same meaning as in builtin ``print()`` function. no_indent : bool (default: False) Do not indent for ``v >= 4``. """ # variable shortcuts if t is not None: time = t if m is not None: memory = m if r is not None: reset = r if isinstance(v, str): v = _VERBOSITY_LEVELS_FROM_STRINGS[v] if v == 3: # insert "--> " before hints msg = ('-->',) + msg if v >= 4 and not no_indent: msg = (' ',) + msg if _settings_verbosity_greater_or_equal_than(v): if not time and not memory and len(msg) > 0: _write_log(*msg, end=end) if reset: try: settings._previous_memory_usage, _ = get_memory_usage() except: pass settings._previous_time = get_time() if time: elapsed = get_passed_time() msg = msg + ('({})'.format(_sec_to_str(elapsed)),) _write_log(*msg, end=end) if memory: _write_log(get_memory_usage(), end=end) m = msg def _write_log(*msg, end='\n'): """Write message to log output, ignoring the verbosity level. This is the most basic function. Parameters ---------- *msg : One or more arguments to be formatted as string. Same behavior as print function. """ from .settings import logfile if logfile == '': print(*msg, end=end) else: out = '' for s in msg: out += str(s) + ' ' with open(logfile, 'a') as f: f.write(out + end) def _sec_to_str(t): """Format time in seconds. Parameters ---------- t : int Time in seconds. """ from functools import reduce return "%d:%02d:%02d.%02d" % \ reduce(lambda ll, b: divmod(ll[0], b) + ll[1:], [(t*100,), 100, 60, 60]) def get_passed_time(): now = get_time() elapsed = now - settings._previous_time settings._previous_time = now return elapsed def print_passed_time(): now = get_time() elapsed = now - settings._previous_time settings._previous_time = now from functools import reduce elapsed = "%d:%02d:%02d.%02d" % reduce(lambda ll, b: divmod(ll[0], b) + ll[1:], [(elapsed*100,), 100, 60, 60]) return elapsed def print_version(): from . import __version__ _write_log('Running scvelo', __version__, '(python ' + python_version() + ')', 'on {}.'.format(get_date_string())) def print_versions(): for mod in ['scvelo', 'scanpy', 'anndata', 'loompy', 'numpy', 'scipy', 'matplotlib', 'sklearn', 'pandas']: mod_name = mod[0] if isinstance(mod, tuple) else mod mod_install = mod[1] if isinstance(mod, tuple) else mod try: print('{}=={}'.format(mod_install, __import__(mod_name).__version__), end=' ') except (ImportError, AttributeError): pass print() def get_date_string(): return datetime.now().strftime("%Y-%m-%d %H:%M") from anndata.logging import print_memory_usage from anndata.logging import get_memory_usage from sys import stdout class ProgressReporter: def __init__(self, total, interval=3): self.count = 0 self.total = total self.timestamp = get_time() self.interval = interval def update(self): self.count += 1 if settings.verbosity > 1 and (get_time() - self.timestamp > self.interval or self.count == self.total): self.timestamp = get_time() percent = int(self.count * 100 / self.total) stdout.write('\r' + '... %d%%' % percent) stdout.flush() def finish(self): if settings.verbosity > 1: stdout.write('\r') stdout.flush() PK(MN/ scvelo/pl.pyfrom scvelo.plotting import * PK(Md## scvelo/pp.pyfrom scvelo.preprocessing import * PK܊>NCscvelo/read_load.pyfrom .preprocessing.utils import set_initial_size import os, re import numpy as np import pandas as pd from urllib.request import urlretrieve from pathlib import Path from scanpy.api import AnnData, read, read_loom def load(filename, backup_url=None, **kwargs): numpy_ext = {'npy', 'npz'} pandas_ext = {'csv', 'txt'} if not os.path.exists(filename) and backup_url is None: raise FileNotFoundError('Did not find file {}.'.format(filename)) elif not os.path.exists(filename): d = os.path.dirname(filename) if not os.path.exists(d): os.makedirs(d) urlretrieve(backup_url, filename) ext = Path(filename).suffixes[-1][1:] if ext in numpy_ext: return np.load(filename, **kwargs) elif ext in pandas_ext: return pd.read_csv(filename, **kwargs) else: raise ValueError('"{}" does not end on a valid extension.\n' 'Please, provide one of the available extensions.\n{}\n' .format(filename, numpy_ext|pandas_ext)) read_csv = load def clean_obs_names(data, base='[AGTCBDHKMNRSVWY]', ID_length=12, copy=False): """Cleans up the obs_names and identifies sample names. For example an obs_name 'samlple1_AGTCdate' is changed to 'AGTC' of the sample 'sample1_date'. The sample name is then saved in obs['sample_batch']. The genetic codes are identified according to according to https://www.neb.com/tools-and-resources/usage-guidelines/the-genetic-code. Arguments --------- adata: :class:`~anndata.AnnData` Annotated data matrix. base: `str` (default: `[AGTCBDHKMNRSVWY]`) Genetic code letters to be identified. ID_length: `int` (default: 12) Length of the Genetic Codes in the samples. copy: `bool` (default: `False`) Return a copy instead of writing to adata. Returns ------- Returns or updates `adata` with the attributes obs_names: list updated names of the observations sample_batch: `.obs` names of the identified sample batches """ def get_base_list(name, base): base_list = base while re.search(base_list + base, name) is not None: base_list += base if len(base_list) == 0: raise ValueError('Encountered an invalid ID in obs_names: ', name) return base_list adata = data.copy() if copy else data names = adata.obs_names base_list = get_base_list(names[0], base) if len(np.unique([len(name) for name in adata.obs_names])) == 1: start, end = re.search(base_list, names[0]).span() newIDs = [name[start:end] for name in names] start, end = 0, len(newIDs[0]) for i in range(end - ID_length): if np.any([ID[i] not in base for ID in newIDs]): start += 1 if np.any([ID[::-1][i] not in base for ID in newIDs]): end -= 1 newIDs = [ID[start:end] for ID in newIDs] prefixes = [names[i].replace(newIDs[i], '') for i in range(len(names))] else: prefixes, newIDs = [], [] for name in names: match = re.search(base_list, name) newID = re.search(get_base_list(name, base), name).group() if match is None else match.group() newIDs.append(newID) prefixes.append(name.replace(newID, '')) adata.obs_names = newIDs if len(prefixes[0]) > 0 and len(np.unique(prefixes)) > 1: #idx_names = np.random.choice(len(names), size=20, replace=False) #for i in range(len(names[0])): # if np.all([re.search(names[0][:i], names[ix]) for ix in idx_names]) is not None: obs_key = names[0][:i] adata.obs['sample_batch'] = pd.Categorical(prefixes) if len(np.unique(prefixes)) < adata.n_obs else prefixes adata.obs_names_make_unique() return adata if copy else None def merge(adata, ldata, copy=True): """Merges two annotated data matrices. Arguments --------- adata: :class:`~anndata.AnnData` Annotated data matrix (reference data set). ldata: :class:`~anndata.AnnData` Annotated data matrix (to be merged into adata). Returns ------- Returns a :class:`~anndata.AnnData` object """ if 'spliced' in ldata.layers.keys() and 'initial_size_spliced' not in ldata.obs.keys(): set_initial_size(ldata) elif 'spliced' in adata.layers.keys() and 'initial_size_spliced' not in adata.obs.keys(): set_initial_size(adata) common_obs = adata.obs_names.intersection(ldata.obs_names) common_vars = adata.var_names.intersection(ldata.var_names) if len(common_obs) == 0: clean_obs_names(adata) clean_obs_names(ldata) common_obs = adata.obs_names.intersection(ldata.obs_names) if copy: _adata = adata[common_obs].copy() if adata.shape[1] >= ldata.shape[1] else ldata[common_obs].copy() _ldata = ldata[common_obs].copy() if adata.shape[1] >= ldata.shape[1] else adata[common_obs].copy() else: adata._inplace_subset_obs(common_obs) _adata, _ldata = adata, ldata[common_obs] same_vars = (len(_adata.var_names) == len(_ldata.var_names) and np.all(_adata.var_names == _ldata.var_names)) if len(common_vars) > 0 and not same_vars: _adata._inplace_subset_var(common_vars) _ldata._inplace_subset_var(common_vars) for attr in _ldata.obs.keys(): _adata.obs[attr] = _ldata.obs[attr] for attr in _ldata.obsm.keys(): _adata.obsm[attr] = _ldata.obsm[attr] for attr in _ldata.uns.keys(): _adata.uns[attr] = _ldata.uns[attr] for attr in _ldata.layers.keys(): _adata.layers[attr] = _ldata.layers[attr] if _adata.shape[1] == _ldata.shape[1]: same_vars = (len(_adata.var_names) == len(_ldata.var_names) and np.all(_adata.var_names == _ldata.var_names)) if same_vars: for attr in _ldata.var.keys(): _adata.var[attr] = _ldata.var[attr] for attr in _ldata.varm.keys(): _adata.varm[attr] = _ldata.varm[attr] else: raise ValueError('Variable names are not identical.') return _adata if copy else None PK|bNE$E$scvelo/settings.py"""Settings """ # set global verbosity level to show errors(0), warnings(1), info(2) and hints(3) verbosity = 3 plot_suffix = '' """Global suffix that is appended to figure filenames. """ file_format_data = 'h5ad' """File format for saving AnnData objects. Allowed are 'txt', 'csv' (comma separated value file) for exporting and 'h5ad' (hdf5) for lossless saving. """ file_format_figs = 'pdf' """File format for saving figures. For example 'png', 'pdf' or 'svg'. Many other formats work as well (see `matplotlib.pyplot.savefig`). """ autosave = False """Save plots/figures as files in directory 'figs'. Do not show plots/figures interactively. """ autoshow = True """Show all plots/figures automatically if autosave == False. There is no need to call the matplotlib pl.show() in this case. """ writedir = './write/' """Directory where the function scanpy.write writes to by default. """ cachedir = './cache/' """Default cache directory. """ figdir = './figures/' """Directory where plots are saved. """ max_memory = 15 """Maximal memory usage in Gigabyte. Is currently not well respected.... """ n_jobs = 1 """Default number of jobs/ CPUs to use for parallel computing. """ logfile = '' """Name of logfile. By default is set to '' and writes to standard output.""" categories_to_ignore = ['N/A', 'dontknow', 'no_gate', '?'] """Categories that are omitted in plotting etc. """ _frameon = False """See set_figure_params. """ _rcParams_style = None """See set_figure_params. """ # -------------------------------------------------------------------------------- # Functions # -------------------------------------------------------------------------------- from matplotlib import rcParams, cm, colors from cycler import cycler # default matplotlib 2.0 palette slightly modified. vega_10 = list(map(colors.to_hex, cm.tab10.colors)) vega_20 = list(map(colors.to_hex, cm.tab20.colors)) vega_20 = [*vega_20[0:14:2], *vega_20[16::2], *vega_20[1:15:2], *vega_20[17::2], '#ad494a', '#8c6d31'] vega_20[2], vega_20[4], vega_20[7] = '#279e68', '#aa40fc', '#b5bd61' # green, purple, khaki def set_rcParams_scvelo(fontsize=8, color_map=None, frameon=None): """Set matplotlib.rcParams to scvelo defaults.""" # dpi options (mpl default: 100, 100) rcParams['figure.dpi'] = 100 rcParams['savefig.dpi'] = 150 # figure (mpl default: 0.125, 0.96, 0.15, 0.91) rcParams['figure.figsize'] = (7, 5) rcParams['figure.subplot.left'] = 0.18 rcParams['figure.subplot.right'] = 0.96 rcParams['figure.subplot.bottom'] = 0.15 rcParams['figure.subplot.top'] = 0.91 # lines (defaults: 1.5, 6, 1) rcParams['lines.linewidth'] = 1.5 # the line width of the frame rcParams['lines.markersize'] = 6 rcParams['lines.markeredgewidth'] = 1 # font rcParams['font.sans-serif'] = \ ['Arial', 'Helvetica', 'DejaVu Sans', 'Bitstream Vera Sans', 'sans-serif'] fontsize = fontsize labelsize = 0.92 * fontsize # fonsizes (mpl default: 10, medium, large, medium) rcParams['font.size'] = fontsize rcParams['legend.fontsize'] = labelsize rcParams['axes.titlesize'] = fontsize rcParams['axes.labelsize'] = labelsize # legend (mpl default: 1, 1, 2, 0.8) rcParams['legend.numpoints'] = 1 rcParams['legend.scatterpoints'] = 1 rcParams['legend.handlelength'] = 0.5 rcParams['legend.handletextpad'] = 0.4 # color cycle rcParams['axes.prop_cycle'] = cycler(color=vega_10) # axes rcParams['axes.linewidth'] = 0.8 rcParams['axes.edgecolor'] = 'black' rcParams['axes.facecolor'] = 'white' # ticks (mpl default: k, k, medium, medium) rcParams['xtick.color'] = 'k' rcParams['ytick.color'] = 'k' rcParams['xtick.labelsize'] = labelsize rcParams['ytick.labelsize'] = labelsize # axes grid (mpl default: False, #b0b0b0) rcParams['axes.grid'] = False rcParams['grid.color'] = '.8' # color map rcParams['image.cmap'] = 'RdBu_r' if color_map is None else color_map # frame (mpl default: True) frameon = False if frameon is None else frameon global _frameon _frameon = frameon def set_rcParams_scanpy(fontsize=14, color_map=None, frameon=None): """Set matplotlib.rcParams to Scanpy defaults.""" # dpi options rcParams['figure.dpi'] = 100 rcParams['savefig.dpi'] = 150 # figure rcParams['figure.figsize'] = (4, 4) rcParams['figure.subplot.left'] = 0.18 rcParams['figure.subplot.right'] = 0.96 rcParams['figure.subplot.bottom'] = 0.15 rcParams['figure.subplot.top'] = 0.91 rcParams['lines.linewidth'] = 1.5 # the line width of the frame rcParams['lines.markersize'] = 6 rcParams['lines.markeredgewidth'] = 1 # font rcParams['font.sans-serif'] = [ 'Arial', 'Helvetica', 'DejaVu Sans', 'Bitstream Vera Sans', 'sans-serif'] fontsize = fontsize labelsize = 0.92 * fontsize rcParams['font.size'] = fontsize rcParams['legend.fontsize'] = labelsize rcParams['axes.titlesize'] = fontsize rcParams['axes.labelsize'] = fontsize # legend rcParams['legend.numpoints'] = 1 rcParams['legend.scatterpoints'] = 1 rcParams['legend.handlelength'] = 0.5 rcParams['legend.handletextpad'] = 0.4 # color cycle rcParams['axes.prop_cycle'] = cycler(color=vega_20) # lines rcParams['axes.linewidth'] = 0.8 rcParams['axes.edgecolor'] = 'black' rcParams['axes.facecolor'] = 'white' # ticks rcParams['xtick.color'] = 'k' rcParams['ytick.color'] = 'k' rcParams['xtick.labelsize'] = fontsize rcParams['ytick.labelsize'] = fontsize # axes grid rcParams['axes.grid'] = True rcParams['grid.color'] = '.8' # color map rcParams['image.cmap'] = rcParams['image.cmap'] if color_map is None else color_map # frame frameon = True if frameon is None else frameon global _frameon _frameon = frameon def set_figure_params(style='scvelo', figsize=None, dpi=None, dpi_save=None, frameon=None, vector_friendly=True, color_map=None, format='pdf', transparent=False, ipython_format='png2x'): """Set resolution/size, styling and format of figures. Arguments --------- style : `str` (default: `None`) Init default values for ``matplotlib.rcParams`` suited for `scvelo` or `scanpy`. Use `None` for the default matplotlib values. figsize: `[float, float]` (default: `None`) Width and height for default figure size. dpi : `int` (default: `None`) Resolution of rendered figures - this influences the size of figures in notebooks. dpi_save : `int` (default: `None`) Resolution of saved figures. This should typically be higher to achieve publication quality. frameon : `bool` (default: `None`) Add frames and axes labels to scatter plots. vector_friendly : `bool` (default: `True`) Plot scatter plots using `png` backend even when exporting as `pdf` or `svg`. color_map : `str` (default: `None`) Convenience method for setting the default color map. format : {'png', 'pdf', 'svg', etc.} (default: 'pdf') This sets the default format for saving figures: `file_format_figs`. transparent : `bool` (default: `True`) Save figures with transparent back ground. Sets `rcParams['savefig.transparent']`. ipython_format : list of `str` (default: 'png2x') Only concerns the notebook/IPython environment; see `IPython.core.display.set_matplotlib_formats` for more details. """ try: import IPython IPython.core.display.set_matplotlib_formats(ipython_format) except: pass from matplotlib import rcParams global _rcParams_style _rcParams_style = style global _vector_friendly _vector_friendly = vector_friendly global file_format_figs file_format_figs = format if transparent is not None: rcParams['savefig.transparent'] = transparent if style is 'scvelo': set_rcParams_scvelo(color_map=color_map, frameon=frameon) elif style is 'scanpy': set_rcParams_scanpy(color_map=color_map, frameon=frameon) # Overwrite style options if given if figsize is not None: rcParams['figure.figsize'] = figsize if dpi is not None: rcParams['figure.dpi'] = dpi if dpi_save is not None: rcParams['savefig.dpi'] = dpi_save def set_rcParams_defaults(): """Reset `matplotlib.rcParams` to defaults.""" from matplotlib import rcParamsDefault rcParams.update(rcParamsDefault) # ------------------------------------------------------------------------------ # Private global variables & functions # ------------------------------------------------------------------------------ _vector_friendly = False """Set to true if you want to include pngs in svgs and pdfs. """ _low_resolution_warning = True """Print warning when saving a figure with low resolution.""" def _set_start_time(): from time import time return time() _start = _set_start_time() """Time when the settings module is first imported.""" _previous_time = _start """Variable for timing program parts.""" _previous_memory_usage = -1 """Stores the previous memory usage.""" PK(ME scvelo/tl.pyfrom scvelo.tools import * PKcyNGh%%scvelo/utils.pyfrom .preprocessing.utils import show_proportions, cleanup, set_initial_size, get_initial_size from .preprocessing.neighbors import get_connectivities from .preprocessing.moments import second_order_moments, second_order_moments_u from .tools.utils import * from .tools.rank_velocity_genes import get_mean_var from .tools.run import convert_to_adata, convert_to_loom from .tools.optimization import leastsq_NxN, leastsq_generalized, maximum_likelihood, get_weight from .tools.velocity_confidence import random_subsample from .tools.velocity_graph import vals_to_csr from .plotting.utils import is_categorical, clip, interpret_colorkey from .plotting.velocity_embedding_grid import compute_velocity_on_grid from .plotting.simulation import compute_dynamics from .read_load import clean_obs_names, merge PKCN scvelo/plotting/__init__.pyfrom .scatter import scatter from .velocity import velocity from .velocity_embedding import velocity_embedding from .velocity_embedding_grid import velocity_embedding_grid from .velocity_embedding_stream import velocity_embedding_stream from .velocity_graph import velocity_graph from .heatmap import heatmap from .utils import hist, plot from .simulation import simulation from scanpy.api.pl import paga, paga_compare, rank_genes_groups PK tNڽ scvelo/plotting/docs.py"""Shared docstrings for plotting function parameters. """ from textwrap import dedent def doc_params(**kwds): """\ Docstrings should start with "\" in the first line for proper formatting. """ def dec(obj): obj.__doc__ = dedent(obj.__doc__).format(**kwds) return obj return dec doc_scatter = """\ basis: `str` (default='umap') Key for embedding. color: `str`, list of `str` or `None` (default: `None`) Key for annotations of observations/cells or variables/genes use_raw : `bool` (default: `None`) Use `raw` attribute of `adata` if present. layer: `str`, list of `str` or `None` (default: `None`) Specify the layer for `color`. color_map: `str` (default: `matplotlib.rcParams['image.cmap']`) String denoting matplotlib color map. colorbar: `bool` (default: `False`) Whether to show colorbar. palette: list of `str` (default: `None`) Colors to use for plotting groups (categorical annotation). size: `float` (default: 5) Point size. alpha: `float` (default: 1) Set blending - 0 transparent to 1 opaque. linewidth: `float` (default: 1) Scaling factor for the width of occurring lines. perc: tuple, e.g. [2,98] (default: `None`) Specify percentile for continuous coloring. sort_order: `bool` (default: `True`) For continuous annotations used as color parameter, plot data points with higher values on top of others. groups: `str` (default: `all groups`) Restrict to a few categories in categorical observation annotation. components: `str` or list of `str` (default: '1,2') For instance, ['1,2', '2,3']. projection: {'2d', '3d'} (default: '2d') Projection of plot. legend_loc: str (default: 'none') Location of legend, either 'on data', 'right margin' or valid keywords for matplotlib.legend. legend_fontsize: `int` (default: `None`) Legend font size. legend_fontweight: {'normal', 'bold', ...} (default: `None`) Legend font weight. Defaults to 'bold' if `legend_loc = 'on data'`, otherwise to 'normal'. Available are `['light', 'normal', 'medium', 'semibold', 'bold', 'heavy', 'black']`. right_margin: `float` or list of `float` (default: `None`) Adjust the width of the space right of each plotting panel. left_margin: `float` or list of `float` (default: `None`) Adjust the width of the space left of each plotting panel. xlabel: `str` (default: `None`) Label of x-axis. ylabel: `str` (default: `None`) Label of y-axis. title: `str` (default: `None`) Provide title for panels either as, e.g. `["title1", "title2", ...]`. fontsize: `float` (default: `None`) Label font size. figsize: tuple (default: `(7,5)`) Figure size. dpi: `int` (default: 80) Figure dpi. frameon: `bool` (default: `True`) Draw a frame around the scatter plot. ncols : `int` (default: `None`) Number of panels per row. show: `bool`, optional (default: `None`) Show the plot, do not return axis. save: `bool` or `str`, optional (default: `None`) If `True` or a `str`, save the figure. A string is appended to the default filename. Infer the filetype if ending on {'.pdf', '.png', '.svg'}. ax: `matplotlib.Axes`, optional (default: `None`) A matplotlib axes object. Only works if plotting a single component.\ """PKMk4ND@@scvelo/plotting/heatmap.pyimport numpy as np import matplotlib.pyplot as pl from matplotlib import rcParams from matplotlib.colors import ColorConverter from pandas import unique from scipy.sparse import issparse from .utils import is_categorical, interpret_colorkey, default_basis, default_size, get_components, savefig_or_show, \ default_color, make_unique_list, set_colorbar, default_color_map, set_label, set_title def heatmap(adata, var_names, groups=None, groupby=None, annotations=None, use_raw=False, layers=['X'], color_map=None, color_map_anno=None, colorbar=True, row_width=None, xlabel=None, title=None, figsize=None, dpi=None, show=True, save=None, ax=None, **kwargs): """\ Plot pseudotimeseries for genes as heatmap. Arguments --------- adata: :class:`~anndata.AnnData` Annotated data matrix. var_names: `str`, list of `str` Names of variables to use for the plot. groups: `str`, list of `str` or `None` (default: `None`) Groups selected to plot. Must be an element of adata.obs[groupby]. groupby: `str` or `None` (default: `None`) Key in adata.obs. Indicates how to group the plot. annotations: `str`, list of `str` or `None` (default: `None`) Key in adata.obs. Annotations are plotted in the last row. use_raw: `bool` (default: `False`) If true, moments are used instead of raw data. layers: `str`, list of `str` or `None` (default: `['X']`) Selected layers. color_map: `str`, list of `str` or `None` (default: `None`) String denoting matplotlib color map for the heat map. There must be one list entry for each layer. color_map_anno: `str`, list of `str` or `None` (default: `None`) String denoting matplotlib color map for the annotations. There must be one list entry for each annotation. colorbar: `bool` (default: `True`) If True, a colormap for each layer is added on the right bottom corner. row_width: `float` (default: `None`) Constant width of all rows. xlabel: Label for the x-axis. title: `str` or `None` (default: `None`) Main plot title. figsize: tuple (default: `(7,5)`) Figure size. dpi: `int` (default: 80) Figure dpi. show: `bool`, optional (default: `None`) Show the plot, do not return axis. save: `bool` or `str`, optional (default: `None`) If `True` or a `str`, save the figure. A string is appended to the default filename. Infer the filetype if ending on {'.pdf', '.png', '.svg'}. ax: `matplotlib.Axes`, optional (default: `None`) A matplotlib axes object. Only works if plotting a single component. Returns ------- If `show==False` a `matplotlib.Axis` """ # catch if 'velocity_pseudotime' not in adata.obs.keys(): raise ValueError( 'A function requires computation of the pseudotime' 'for ordering at single-cell resolution') if annotations is None: annotations = [] if isinstance(var_names, str): var_names = [var_names] if len(var_names) == 0: var_names = np.arange(adata.X.shape[1]) if var_names.ndim == 2: var_names = var_names[:, 0] var_names = [name for name in var_names if name in adata.var_names] if len(var_names) == 0: raise ValueError( 'The specified var_names are all not' 'contained in the adata.var_names.') if layers is None: layers = ['X'] if isinstance(layers, str): layers = [layers] layers = [layer for layer in layers if layer in adata.layers.keys() or layer == 'X'] if len(layers) == 0: raise ValueError( 'The selected layers are not contained' 'in adata.layers.keys().') if not use_raw: layers = np.array(layers) if 'X' in layers: layers[np.array([layer == 'X' for layer in layers])] = 'Ms' if 'spliced' in layers: layers[np.array([layer == 'spliced' for layer in layers])] = 'Ms' if 'unspliced' in layers: layers[np.array([layer == 'unspliced' for layer in layers])] = 'Ms' layers = list(layers) if 'Ms' in layers and 'Ms' not in adata.layers.keys(): raise ValueError( 'Moments have to be computed before' 'using this plot function.') if 'Mu' in layers and 'Mu' not in adata.layers.keys(): raise ValueError( 'Moments have to be computed before' 'using this plot function.') layers = unique(layers) # Number of rows to plot tot_len = len(var_names) * len(layers) + len(annotations) # init main figure figsize = rcParams['figure.figsize'] if figsize is None else figsize if row_width is not None: figsize[1] = row_width * tot_len ax = pl.figure(figsize=figsize, dpi=dpi).gca() if ax is None else ax ax.set_yticks([]) ax.set_xticks([]) # groups bar ax_bounds = ax.get_position().bounds if groupby is not None: # catch if groupby not in adata.obs_keys(): raise ValueError( 'The selected groupby is not contained' 'in adata.obs_keys().') if groups is None: # Then use everything of that obs groups = unique(adata.obs.clusters.values) imlist = [] for igroup, group in enumerate(groups): for ivar, var in enumerate(var_names): for ilayer, layer in enumerate(layers): groups_axis = pl.axes([ax_bounds[0] + igroup * ax_bounds[2] / len(groups), ax_bounds[1] + ax_bounds[3] * ( tot_len - ivar * len(layers) - ilayer - 1) / tot_len, ax_bounds[2] / len(groups), (ax_bounds[3] - ax_bounds[3] / tot_len * len(annotations)) / ( len(var_names) * len(layers))]) # Get data to fill and reshape dat = adata[:, var] idx_group = [adata.obs[groupby] == group] idx_group = np.array(idx_group[0].tolist()) idx_var = [vn in var_names for vn in adata.var_names] idx_pt = np.array(adata.obs.velocity_pseudotime).argsort() idx_pt = idx_pt[np.array(np.isnan(np.array(dat.obs.velocity_pseudotime)[idx_pt]) == False)] if layer == 'X': laydat = dat.X else: laydat = dat.layers[layer] t1, t2, t3 = idx_group, idx_var, idx_pt t1 = t1[t3] # laydat = laydat[:, t2] # select vars laydat = laydat[t3] laydat = laydat[t1] # select ordered groups if issparse(laydat): laydat = laydat.A # transpose X for ordering in direction var_names: up->downwards laydat = laydat.T[::-1] laydat = laydat.reshape((1, len(laydat))) # ensure 1dimty # plot im = groups_axis.imshow(laydat, aspect='auto', interpolation="nearest", cmap=color_map[ilayer]) # Frames if ilayer == 0: groups_axis.spines['bottom'].set_visible(False) elif ilayer == len(layer) - 1: groups_axis.spines['top'].set_visible(False) else: groups_axis.spines['top'].set_visible(False) groups_axis.spines['bottom'].set_visible(False) # Further visuals if igroup == 0: if colorbar: if len(layers) % 2 == 0: if ilayer == len(layers) / 2 - 1: pl.yticks([0.5], [var]) else: groups_axis.set_yticks([]) else: if ilayer == (len(layers) - 1) / 2: pl.yticks([0], [var]) else: groups_axis.set_yticks([]) else: pl.yticks([0], [layer + ' ' + var]) else: groups_axis.set_yticks([]) groups_axis.set_xticks([]) if ilayer == 0 and ivar == 0: groups_axis.set_title(str(group)) groups_axis.grid(False) # handle needed as mappable for colorbar if igroup == len(groups) - 1: imlist.append(im) # further annotations for each group if annotations is not None: for ianno, anno in enumerate(annotations): anno_axis = pl.axes([ax_bounds[0] + igroup * ax_bounds[2] / len(groups), ax_bounds[1] + ax_bounds[3] / tot_len * (len(annotations) - ianno - 1), ax_bounds[2] / len(groups), ax_bounds[3] / tot_len]) if is_categorical(adata, anno): colo = interpret_colorkey(adata, anno)[t3][t1] colo.reshape(1, len(colo)) mapper = np.vectorize(ColorConverter.to_rgb) a = mapper(colo) a = np.array(a).T Y = a.reshape(1, len(colo), 3) else: Y = np.array(interpret_colorkey(adata, anno))[t3][t1] Y = Y.reshape(1, len(Y)) img = anno_axis.imshow(Y, aspect='auto', interpolation='nearest', cmap=color_map_anno) if igroup == 0: anno_axis.set_yticklabels(['', anno, '']) # , fontsize=ytick_fontsize) anno_axis.tick_params(axis='both', which='both', length=0) else: anno_axis.set_yticklabels([]) anno_axis.set_yticks([]) anno_axis.set_xticks([]) anno_axis.set_xticklabels([]) anno_axis.grid(False) pl.ylim([.5, -.5]) # center ticks else: # groupby is False imlist = [] for ivar, var in enumerate(var_names): for ilayer, layer in enumerate(layers): ax_bounds = ax.get_position().bounds groups_axis = pl.axes([ax_bounds[0], ax_bounds[1] + ax_bounds[3] * ( tot_len - ivar * len(layers) - ilayer - 1) / tot_len, ax_bounds[2], (ax_bounds[3] - ax_bounds[3] / tot_len * len(annotations)) / ( len(var_names) * len(layers))]) # Get data to fill dat = adata[:, var] idx = np.array(dat.obs.velocity_pseudotime).argsort() idx = idx[np.array(np.isnan(np.array(dat.obs.velocity_pseudotime)[idx]) == False)] if layer == 'X': laydat = dat.X else: laydat = dat.layers[layer] laydat = laydat[idx] if issparse(laydat): laydat = laydat.A # transpose X for ordering in direction var_names: up->downwards laydat = laydat.T[::-1] laydat = laydat.reshape((1, len(laydat))) # plot im = groups_axis.imshow(laydat, aspect='auto', interpolation="nearest", cmap=color_map[ilayer]) imlist.append(im) # Frames if ilayer == 0: groups_axis.spines['bottom'].set_visible(False) elif ilayer == len(layer) - 1: groups_axis.spines['top'].set_visible(False) else: groups_axis.spines['top'].set_visible(False) groups_axis.spines['bottom'].set_visible(False) # Further visuals groups_axis.set_xticks([]) groups_axis.grid(False) pl.ylim([.5, -.5]) # center if colorbar: if len(layers) % 2 == 0: if ilayer == len(layers) / 2 - 1: pl.yticks([0.5], [var]) else: groups_axis.set_yticks([]) else: if ilayer == (len(layers) - 1) / 2: pl.yticks([0], [var]) else: groups_axis.set_yticks([]) else: pl.yticks([0], [layer + ' ' + var]) # further annotations bars if annotations is not None: for ianno, anno in enumerate(annotations): anno_axis = pl.axes([ax_bounds[0], ax_bounds[1] + ax_bounds[3] / tot_len * (len(annotations) - ianno - 1), ax_bounds[2], ax_bounds[3] / tot_len]) dat = adata[:, var_names] if is_categorical(dat, anno): colo = interpret_colorkey(dat, anno)[idx] colo.reshape(1, len(colo)) mapper = np.vectorize(ColorConverter.to_rgb) a = mapper(colo) a = np.array(a).T Y = a.reshape(1, len(idx), 3) else: Y = np.array(interpret_colorkey(dat, anno)[idx]).reshape(1, len(idx)) img = anno_axis.imshow(Y, aspect='auto', interpolation='nearest', cmap=color_map_anno) anno_axis.set_yticklabels(['', anno, '']) # , fontsize=ytick_fontsize) anno_axis.tick_params(axis='both', which='both', length=0) anno_axis.grid(False) anno_axis.set_xticks([]) anno_axis.set_xticklabels([]) pl.ylim([-.5, +.5]) # Colorbar if colorbar: if len(layers) > 1: # I must admit, this part is chaotic for ilayer, layer in enumerate(layers): w = 0.015 * 10 / figsize[0] # 0.02 * ax_bounds[2] x = ax_bounds[0] + ax_bounds[2] * 0.99 + 1.5 * w + w * 1.2 * ilayer y = ax_bounds[1] h = ax_bounds[3] * .3 cbaxes = pl.axes([x, y, w, h]) cb = pl.colorbar(mappable=imlist[ilayer], cax=cbaxes) pl.text(x - 40 * w, y + h * 4, layer, rotation=45, horizontalalignment='left', verticalalignment='bottom') if ilayer == len(layers) - 1: ext = abs(cb.vmin - cb.vmax) cb.set_ticks([cb.vmin + 0.07 * ext, cb.vmax - 0.07 * ext]) cb.ax.set_yticklabels(['Low', 'High']) # vertical colorbar else: cb.set_ticks([]) else: cbaxes = pl.axes([ax_bounds[0] + ax_bounds[2] + .01, ax_bounds[1], 0.02, ax_bounds[3] * .3]) cb = pl.colorbar(mappable=im, cax=cbaxes) cb.set_ticks([cb.vmin, cb.vmax]) cb.ax.set_yticklabels(['Low', 'High']) if xlabel is None: xlabel = 'velocity' + ' ' + 'pseudotime' if title is not None: ax.set_title(title, pad=30) if len(annotations) == 0: ax.set_xlabel(xlabel) ax.xaxis.labelpad = 20 # set_label(xlabel, None, fontsize, basis) # set_title(title, None, None, fontsize) # ax = update_axes(ax, fontsize) savefig_or_show('heatmap', dpi=dpi, save=save, show=show) if not show: return axPKTp4N51  scvelo/plotting/palettes.py"""Color palettes in addition to matplotlib's palettes.""" from matplotlib import cm, colors # Colorblindness adjusted vega_10 # See https://github.com/theislab/scanpy/issues/387 vega_10 = list(map(colors.to_hex, cm.tab10.colors)) vega_10_scanpy = vega_10.copy() vega_10_scanpy[2] = '#279e68' # green vega_10_scanpy[4] = '#aa40fc' # purple vega_10_scanpy[8] = '#b5bd61' # kakhi # default matplotlib 2.0 palette # see 'category20' on https://github.com/vega/vega/wiki/Scales#scale-range-literals vega_20 = list(map(colors.to_hex, cm.tab20.colors)) # reorderd, some removed, some added vega_20_scanpy = [ *vega_20[0:14:2], *vega_20[16::2], # dark without grey *vega_20[1:15:2], *vega_20[17::2], # light without grey '#ad494a', '#8c6d31', # manual additions ] vega_20_scanpy[2] = vega_10_scanpy[2] vega_20_scanpy[4] = vega_10_scanpy[4] vega_20_scanpy[7] = vega_10_scanpy[8] # kakhi shifted by missing grey default_20 = vega_20_scanpy # https://graphicdesign.stackexchange.com/questions/3682/where-can-i-find-a-large-palette-set-of-contrasting-colors-for-coloring-many-d # update 1 # orig reference http://epub.wu.ac.at/1692/1/document.pdf zeileis_26 = [ "#023fa5", "#7d87b9", "#bec1d4", "#d6bcc0", "#bb7784", "#8e063b", "#4a6fe3", "#8595e1", "#b5bbe3", "#e6afb9", "#e07b91", "#d33f6a", "#11c638", "#8dd593", "#c6dec7", "#ead3c6", "#f0b98d", "#ef9708", "#0fcfc0", "#9cded6", "#d5eae7", "#f3e1eb", "#f6c4e1", "#f79cd4", '#7f7f7f', "#c7c7c7", "#1CE6FF", "#336600" # these last ones were added, ] default_26 = zeileis_26 # from http://godsnotwheregodsnot.blogspot.de/2012/09/color-distribution-methodology.html godsnot_64 = [ # "#000000", # remove the black, as often, we have black colored annotation "#FFFF00", "#1CE6FF", "#FF34FF", "#FF4A46", "#008941", "#006FA6", "#A30059", "#FFDBE5", "#7A4900", "#0000A6", "#63FFAC", "#B79762", "#004D43", "#8FB0FF", "#997D87", "#5A0007", "#809693", "#FEFFE6", "#1B4400", "#4FC601", "#3B5DFF", "#4A3B53", "#FF2F80", "#61615A", "#BA0900", "#6B7900", "#00C2A0", "#FFAA92", "#FF90C9", "#B903AA", "#D16100", "#DDEFFF", "#000035", "#7B4F4B", "#A1C299", "#300018", "#0AA6D8", "#013349", "#00846F", "#372101", "#FFB500", "#C2FFED", "#A079BF", "#CC0744", "#C0B9B2", "#C2FF99", "#001E09", "#00489C", "#6F0062", "#0CBD66", "#EEC3FF", "#456D75", "#B77B68", "#7A87A1", "#788D66", "#885578", "#FAD09F", "#FF8A9A", "#D157A0", "#BEC459", "#456648", "#0086ED", "#886F4C", "#34362D", "#B4A8BD", "#00A6AA", "#452C2C", "#636375", "#A3C8C9", "#FF913F", "#938A81", "#575329", "#00FECF", "#B05B6F", "#8CD0FF", "#3B9700", "#04F757", "#C8A1A1", "#1E6E00", "#7900D7", "#A77500", "#6367A9", "#A05837", "#6B002C", "#772600", "#D790FF", "#9B9700", "#549E79", "#FFF69F", "#201625", "#72418F", "#BC23FF", "#99ADC0", "#3A2465", "#922329", "#5B4534", "#FDE8DC", "#404E55", "#0089A3", "#CB7E98", "#A4E804", "#324E72", "#6A3A4C"] default_64 = godsnot_64 from typing import Mapping, Sequence def _plot_color_cylce(clists: Mapping[str, Sequence[str]]): import numpy as np import matplotlib.pyplot as plt from matplotlib.colors import ListedColormap, BoundaryNorm fig, axes = plt.subplots(nrows=len(clists)) # type: plt.Figure, plt.Axes fig.subplots_adjust(top=.95, bottom=.01, left=.3, right=.99) axes[0].set_title('Color Maps/Cycles', fontsize=14) for ax, (name, clist) in zip(axes, clists.items()): n = len(clist) ax.imshow( np.arange(n)[None, :].repeat(2, 0), aspect='auto', cmap=ListedColormap(clist), norm=BoundaryNorm(np.arange(n+1)-.5, n), ) pos = list(ax.get_position().bounds) x_text = pos[0] - .01 y_text = pos[1] + pos[3] / 2. fig.text(x_text, y_text, name, va='center', ha='right', fontsize=10) # Turn off all ticks & spines for ax in axes: ax.set_axis_off() fig.show() if __name__ == '__main__': _plot_color_cylce({ name: colors for name, colors in globals().items() if isinstance(colors, list) })PK Mv77scvelo/plotting/pseudotime.pyfrom .utils import * def principal_curve(adata): X_curve = adata.uns['principal_curve']['projections'] ixsort = adata.uns['principal_curve']['ixsort'] pl.plot(X_curve[ixsort, 0], X_curve[ixsort, 1], c="k", lw=3, zorder=2000000) def pseudotime(adata, gene_list, ckey='velocity', reverse=False): ixsort = adata.uns['principal_curve']['ixsort'] arclength = adata.uns['principal_curve']['arclength'] if reverse: arclength /= np.max(arclength) else: arclength = (np.max(arclength) - arclength) / np.max(arclength) cell_subset = adata.uns['principal_curve']['cell_subset'] adata_subset = adata[cell_subset].copy() gs = pl.GridSpec(1, len(gene_list)) for n, gene in enumerate(gene_list): i = np.where(adata_subset.var_names == gene)[0][0] ax = pl.subplot(gs[n]) lb, ub = np.percentile(adata_subset.obsm[ckey][:, i], [.5, 99.5]) c = np.clip(adata_subset.obsm[ckey][ixsort, i], lb, ub) # pl.scatter(arclength[ixsort], adata2.obsm['Mu'][ixsort, i], alpha=0.7, c='b', s=5, label="unspliced") pl.scatter(arclength[ixsort], adata_subset.obsm['Ms'][ixsort, i] * adata_subset.uns['velocity_pars']['gamma'][i], c=c, cmap='coolwarm', alpha=1, s=1, label="spliced") c = c / abs(c).max() * (adata_subset.obsm['Ms'][ixsort, i] * adata_subset.uns['velocity_pars']['gamma'][i]).max() z = np.ma.polyfit(arclength[ixsort], c, 8) fun = np.poly1d(z) pl.plot(arclength[ixsort], fun(arclength[ixsort]), label=ckey) # Hide the right and top spines ax.spines['right'].set_visible(False) ax.spines['top'].set_visible(False) ax.xaxis.set_major_locator(MaxNLocator(nbins=3)) ax.yaxis.set_major_locator(MaxNLocator(nbins=3)) pl.ylabel(gene) pl.title('Colored by ' + ckey) PKwNorH""scvelo/plotting/scatter.pyfrom .. import settings from .. import AnnData from .utils import make_dense, is_categorical, update_axes, set_label, set_title, interpret_colorkey, set_colorbar, \ default_basis, default_color, default_size, default_color_map, get_components, savefig_or_show, make_unique_list, \ show_linear_fit, show_density from .docs import doc_scatter, doc_params from matplotlib import rcParams import matplotlib.pyplot as pl import numpy as np import pandas as pd @doc_params(scatter=doc_scatter) def scatter(adata=None, x=None, y=None, basis=None, vkey=None, color=None, use_raw=None, layer=None, color_map=None, colorbar=True, palette=None, size=None, alpha=None, linewidth=None, perc=None, sort_order=True, groups=None, components=None, projection='2d', legend_loc='none', legend_fontsize=None, legend_fontweight=None, right_margin=None, left_margin=None, xlabel=None, ylabel=None, title=None, fontsize=None, figsize=None, dpi=None, frameon=None, density=None, linear_fit=None, show=True, save=None, ax=None, zorder=None, ncols=None, **kwargs): """\ Scatter plot along observations or variables axes. Arguments --------- adata: :class:`~anndata.AnnData` Annotated data matrix. x: `str`, `np.ndarray` or `None` (default: `None`) x coordinate y: `str`, `np.ndarray` or `None` (default: `None`) y coordinate {scatter} Returns ------- If `show==False` a `matplotlib.Axis` """ scatter_kwargs = {"use_raw": use_raw, "sort_order": sort_order, "alpha": alpha, "components": components, "projection": projection, "legend_loc": legend_loc, "groups": groups, "palette": palette, "legend_fontsize": legend_fontsize, "legend_fontweight": legend_fontweight, "right_margin": right_margin, "left_margin": left_margin, "show": False, "save": None} adata = AnnData(np.stack([x, y]).T) if adata is None and (x is not None and y is not None) else adata colors, layers, bases = make_unique_list(color, allow_array=True), make_unique_list(layer), make_unique_list(basis) multikey = colors if len(colors) > 1 else layers if len(layers) > 1 else bases if len(bases) > 1 else None if multikey is not None: if isinstance(title, (list, tuple)): title *= int(np.ceil(len(multikey) / len(title))) ncols = len(multikey) if ncols is None else min(len(multikey), ncols) nrows = int(np.ceil(len(multikey) / ncols)) figsize = rcParams['figure.figsize'] if figsize is None else figsize for i, gs in enumerate( pl.GridSpec(nrows, ncols, pl.figure(None, (figsize[0] * ncols, figsize[1] * nrows), dpi=dpi))): if i < len(multikey): scatter(adata, x=x, y=y, size=size, linewidth=linewidth, xlabel=xlabel, ylabel=ylabel, vkey=vkey, color_map=color_map, colorbar=colorbar, perc=perc, frameon=frameon, zorder=zorder, fontsize=fontsize, density=density, linear_fit=linear_fit, ax=pl.subplot(gs), color=colors[i] if len(colors) > 1 else color, layer=layers[i] if len(layers) > 1 else layer, basis=bases[i] if len(bases) > 1 else basis, title=title[i] if isinstance(title, (list, tuple)) else title, **scatter_kwargs, **kwargs) savefig_or_show('' if basis is None else basis, dpi=dpi, save=save, show=show) if not show: return ax else: color, layer, basis = colors[0], layers[0], bases[0] color = default_color(adata) if color is None else color color_map = default_color_map(adata, color) if color_map is None else color_map is_embedding = ((x is None) | (y is None)) and basis not in adata.var_names basis = default_basis(adata) if basis is None and is_embedding else basis size = default_size(adata) if size is None else size linewidth = 1 if linewidth is None else linewidth frameon = frameon if frameon is not None else True if not is_embedding else settings._frameon if projection == '3d': from mpl_toolkits.mplot3d import Axes3D ax = pl.figure(None, figsize, dpi=dpi).gca(projection=projection) if ax is None else ax else: ax = pl.figure(None, figsize, dpi=dpi).gca() if ax is None else ax if is_categorical(adata, color) and is_embedding: from scanpy.api.pl import scatter as scatter_ ax = scatter_(adata, basis=basis, color=color, color_map=color_map, size=size, frameon=frameon, ax=ax, title=title, **scatter_kwargs, **kwargs) else: if basis in adata.var_names: xkey, ykey = ('spliced', 'unspliced') if use_raw or 'Ms' not in adata.layers.keys() else ('Ms', 'Mu') x = make_dense(adata[:, basis].layers[xkey]) y = make_dense(adata[:, basis].layers[ykey]) xlabel, ylabel = 'spliced', 'unspliced' title = basis if title is None else title elif is_embedding: X_emb = adata.obsm['X_' + basis][:, get_components(components, basis)] x, y = X_emb[:, 0], X_emb[:, 1] elif isinstance(x, str) and isinstance(y, str) and x in adata.var_names and y in adata.var_names: x = adata[:, x].layers[layer] if layer in adata.layers.keys() else adata[:, x].X y = adata[:, y].layers[layer] if layer in adata.layers.keys() else adata[:, y].X if basis in adata.var_names and isinstance(color, str) and color in adata.layers.keys(): c = interpret_colorkey(adata, basis, color, perc) else: c = interpret_colorkey(adata, color, layer, perc) if layer is not None and any(l in layer for l in ['spliced', 'Ms', 'Mu', 'velocity']) \ and isinstance(color, str) and color in adata.var_names: ub = np.percentile(np.abs(c), 98) if "vmax" not in kwargs: kwargs.update({"vmax": ub}) if "vmin" not in kwargs and 'velocity' in layer: kwargs.update({"vmin": -ub}) if "vmid" in kwargs: if not isinstance(c, str) and not isinstance(c[0], str): vmid, lb, ub = kwargs["vmid"], np.min(c), np.max(c) crange = min(np.abs(vmid - lb), np.abs(ub - vmid)) kwargs.update({"vmin": vmid - crange, "vmax": vmid + crange}) kwargs.pop("vmid") if groups is not None or np.any(pd.isnull(c)): zorder = 0 if zorder is None else zorder ax = scatter(adata, basis=basis, color='lightgrey', ax=ax, zorder=zorder, **scatter_kwargs) zorder += 1 if basis in adata.var_names: fits = show_linear_fit(adata, basis, vkey, xkey, linewidth) from .simulation import show_full_dynamics if 'true_alpha' in adata.var.keys(): fit = show_full_dynamics(adata, basis, 'true', use_raw, linewidth) fits.append(fit) if 'fit_alpha' in adata.var.keys() and (vkey is None or 'dynamics' in vkey): fit = show_full_dynamics(adata, basis, 'fit', use_raw, linewidth, show_assigments=vkey is not None and 'assignment' in vkey) fits.append(fit) if len(fits) > 0 and legend_loc is not None: if legend_loc is 'none': pl.legend(fits, fontsize=legend_fontsize, loc='lower right', bbox_to_anchor=(.98, 0)) else: pl.legend(fits, fontsize=legend_fontsize, loc=legend_loc) if use_raw and perc is not None: pl.xlim(right=np.percentile(x, 99.9 if not isinstance(perc, int) else perc) * 1.05) pl.ylim(top=np.percentile(y, 99.9 if not isinstance(perc, int) else perc) * 1.05) pl.scatter(x, y, c=c, cmap=color_map, s=size, alpha=alpha, edgecolors='none', marker='.', zorder=zorder, **kwargs) if density: show_density(x, y) if linear_fit: xnew = np.linspace(0, x.max() * 1.02) pl.plot(xnew, xnew * (x * y).sum() / (x ** 2).sum()) set_label(xlabel, ylabel, fontsize, basis) set_title(title, layer, color, fontsize) ax = update_axes(ax, fontsize, is_embedding, frameon) if colorbar and not is_categorical(adata, color): set_colorbar(ax) savefig_or_show('' if basis is None else basis, dpi=dpi, save=save, show=show) if not show: return ax PK|N'=7scvelo/plotting/simulation.pyfrom ..tools.dynamical_model_utils import unspliced, spliced, vectorize, tau_u from .utils import make_dense import numpy as np import matplotlib.pyplot as pl from matplotlib import rcParams def compute_dynamics(adata, basis, key='true', extrapolate=None, sort=True, t_=None): idx = np.where(adata.var_names == basis)[0][0] if isinstance(basis, str) else basis alpha = adata.var[key + '_alpha'][idx] if key + '_alpha' in adata.var.keys() else 1 beta = adata.var[key + '_beta'][idx] if key + '_beta' in adata.var.keys() else 1 gamma = adata.var[key + '_gamma'][idx] t_ = adata.var[key + '_t_'][idx] if t_ is None else t_ scaling = adata.var[key + '_scaling'][idx] if key + '_scaling' in adata.var.keys() else 1 if extrapolate: u0_ = unspliced(t_, 0, alpha, beta) tmax = t_ + tau_u(u0_ * 1e-4, u0_, 0, beta) t = np.concatenate([np.linspace(0, t_, num=500), t_ + np.linspace(0, tmax, num=500)]) else: t = adata.obs[key + '_t'].values if key is 'true' else adata.layers[key + '_t'][:, idx] tau, alpha, u0, s0 = vectorize(np.sort(t) if sort else t, t_, alpha, beta, gamma) ut = unspliced(tau, u0, alpha, beta) * scaling st = spliced(tau, s0, u0, alpha, beta, gamma) vt = ut * beta - st * gamma return alpha, ut, st, vt def show_full_dynamics(adata, basis, key='true', use_raw=False, linewidth=1, show_assigments=False): color = 'grey' if key is 'true' else 'purple' linewidth = .5 * linewidth if key is 'true' else linewidth _, ut, st, _ = compute_dynamics(adata, basis, key, extrapolate=True) pl.plot(st, ut, color=color, linewidth=linewidth) if key is not 'true': _, ut, st, _ = compute_dynamics(adata, basis, key, extrapolate=False, sort=False) pl.scatter(st, ut, color=color, s=1) if show_assigments: skey, ukey = ('spliced', 'unspliced') if use_raw or 'Ms' not in adata.layers.keys() else ('Ms', 'Mu') s, u = make_dense(adata[:, basis].layers[skey]), make_dense(adata[:, basis].layers[ukey]) pl.plot(np.array([s, st]), np.array([u, ut]), color='grey', linewidth=.1 * linewidth) idx = np.where(adata.var_names == basis)[0][0] beta, gamma = adata.var[key + '_beta'][idx], adata.var[key + '_gamma'][idx] scaling = adata.var[key + '_scaling'][idx] if key + '_scaling' in adata.var.keys() else 1 xnew = np.linspace(0, st.max()) label = 'learned dynamics' if key is 'fit' else 'true dynamics' pl.plot(xnew, gamma / beta * scaling * xnew, color=color, linestyle='--', linewidth=linewidth, label=label) return label def simulation(adata, var_names='all', legend_loc='upper right', linewidth=None, dpi=None, **kwargs): from ..tools.utils import make_dense from .scatter import scatter var_names = adata.var_names if var_names is 'all' else [name for name in var_names if var_names in adata.var_names] figsize = rcParams['figure.figsize'] ncols = len(var_names) for i, gs in enumerate(pl.GridSpec(1, ncols, pl.figure(None, (figsize[0] * ncols, figsize[1]), dpi=dpi))): idx = np.where(adata.var_names == var_names[i])[0][0] alpha, ut, st, _ = compute_dynamics(adata, idx) t = adata.obs['true_t'] if 'true_t' in adata.obs.keys() else make_dense(adata.layers['fit_t'][:, idx]) idx_sorted = np.argsort(t) t = t[idx_sorted] u = make_dense(adata.layers['unspliced'][:, idx])[idx_sorted] s = make_dense(adata.layers['spliced'][:, idx])[idx_sorted] ax = pl.subplot(gs) _kwargs = {'alpha': .3, 'title': '', 'xlabel': 'time', 'ylabel': 'counts'} _kwargs.update(kwargs) ax = scatter(x=t, y=u, color='darkblue', ax=ax, show=False, **_kwargs) ax = scatter(x=t, y=s, color='red', ax=ax, show=False, **_kwargs) linewidth = 1 if linewidth is None else linewidth ax.plot(t, alpha, label='alpha', linestyle='--', color='grey', linewidth=linewidth) ax.plot(t, ut, label='unspliced', color='darkblue', linewidth=linewidth) ax.plot(t, st, label='spliced', color='red', linewidth=linewidth) pl.xlim(0) pl.ylim(0) if legend_loc is not 'none': pl.legend(loc=legend_loc)PKN}==scvelo/plotting/utils.pyfrom .. import settings from .. import logging as logg from . import palettes import os import numpy as np import matplotlib.pyplot as pl from matplotlib.ticker import MaxNLocator from mpl_toolkits.axes_grid1.inset_locator import inset_axes from matplotlib.colors import is_color_like from matplotlib import rcParams from scipy.sparse import issparse from cycler import Cycler, cycler def make_dense(X): return X.A if issparse(X) and X.ndim == 2 else X.A1 if issparse(X) else X def strings_to_categoricals(adata): """Transform string annotations to categoricals. """ from pandas.api.types import is_string_dtype from pandas import Categorical for df in [adata.obs, adata.var]: string_cols = [key for key in df.columns if is_string_dtype(df[key])] for key in string_cols: c = df[key] c = Categorical(c) if len(c.categories) < len(c): df[key] = c def is_categorical(adata, c): from pandas.api.types import is_categorical as cat strings_to_categoricals(adata) str_not_var = isinstance(c, str) and c not in adata.var_names return str_not_var and (c in adata.obs.keys() and cat(adata.obs[c]) or is_color_like(c)) def default_basis(adata): keys = [key for key in ['pca', 'tsne', 'umap'] if 'X_' + key in adata.obsm.keys()] return keys[-1] if len(keys) > 0 else None def make_unique_list(key, allow_array=False): from pandas import unique, Index if isinstance(key, Index): key = key.tolist() is_list = isinstance(key, (list, tuple, np.record)) if allow_array else isinstance(key, (list, tuple, np.ndarray, np.record)) is_list_of_str = is_list and all(isinstance(item, str) for item in key) return unique(key) if is_list_of_str else key if is_list and len(key) < 20 else [key] def update_axes(ax, fontsize, is_embedding=False, frameon=None): frameon = settings._frameon if frameon is None else frameon if frameon: if is_embedding: ax.tick_params(which='both', bottom=False, left=False, labelbottom=False, labelleft=False) else: ax.xaxis.set_major_locator(MaxNLocator(nbins=3, integer=True)) ax.yaxis.set_major_locator(MaxNLocator(nbins=3, integer=True)) labelsize = int(fontsize * .75) if fontsize is not None else None ax.tick_params(axis='both', which='major', labelsize=labelsize) else: ax.set_xlabel('') ax.set_ylabel('') ax.tick_params(which='both', bottom=False, left=False, labelbottom=False, labelleft=False) ax.set_frame_on(False) return ax def set_label(xlabel, ylabel, fontsize=None, basis=None): if isinstance(xlabel, str) and isinstance(ylabel, str): pl.xlabel(xlabel, fontsize=fontsize) pl.ylabel(ylabel, fontsize=fontsize) elif basis is not None: component_name = ('DC' if 'diffmap' in basis else 'tSNE' if basis == 'tsne' else 'UMAP' if basis == 'umap' else 'PC' if basis == 'pca' else basis.replace('draw_graph_', '').upper() if 'draw_graph' in basis else basis) pl.xlabel(component_name + '1') pl.ylabel(component_name + '2') def set_title(title, layer=None, color=None, fontsize=None): if isinstance(title, str): title = title.replace('_', ' ') pl.title(title, fontsize=fontsize) elif isinstance(layer, str) and isinstance(color, str): title = (color + ' ' + layer).replace('_', ' ') pl.title(title, fontsize=fontsize) elif isinstance(color, str): title = color.replace('_', ' ') pl.title(title, fontsize=fontsize) def set_frame(ax, frameon): frameon = settings._frameon if frameon is None else frameon if not frameon: ax.set_xlabel('') ax.set_ylabel('') ax.set_frame_on(False) return ax def default_size(adata): adjusted, classic = 1.2e5 / adata.n_obs, 20 return np.mean([adjusted, classic]) if settings._rcParams_style == 'scvelo' else adjusted def default_color(adata): return 'clusters' if 'clusters' in adata.obs.keys() else 'louvain' if 'louvain' in adata.obs.keys() else 'grey' def default_color_map(adata, c): return 'viridis_r' if isinstance(c, str) and c in adata.obs.keys() and not is_categorical(adata, c)\ and adata.obs[c].min() == 0 and adata.obs[c].max() == 1 else None def clip(c, perc): if np.size(perc) < 2: perc = [perc, 100] if perc < 50 else [0, perc] lb, ub = np.percentile(c, perc) return np.clip(c, lb, ub) def get_colors(adata, c): if is_color_like(c): return c else: if c+'_colors' not in adata.uns.keys(): palette = default_palette(None) palette = adjust_palette(palette, length=len(adata.obs[c].cat.categories)) adata.uns[c + '_colors'] = palette[:len(adata.obs[c].cat.categories)].by_key()['color'] cluster_ix = adata.obs[c].cat.codes return np.array([adata.uns[c + '_colors'][cluster_ix[i]] for i in range(adata.n_obs)]) def interpret_colorkey(adata, c=None, layer=None, perc=None): if c is None: c = default_color(adata) if is_categorical(adata, c): c = get_colors(adata, c) elif isinstance(c, str): if c in adata.obs.keys(): # color by observation key c = adata.obs[c] elif c in adata.var_names: # color by var in specific layer c = adata[:, c].layers[layer] if layer in adata.layers.keys() else adata[:, c].X c = c.A.flatten() if issparse(c) else c else: raise ValueError('color key is invalid! pass valid observation annotation or a gene name') if perc is not None: c = clip(c, perc=perc) elif len(np.array(c).flatten()) == adata.n_obs: # continuous coloring c = np.array(c).flatten() if perc is not None: c = clip(c, perc=perc) else: raise ValueError('color key is invalid! pass valid observation annotation or a gene name') return c def get_components(components=None, basis=None, projection=None): if components is None: components = '1,2,3' if projection == '3d' else '1,2' if isinstance(components, str): components = components.split(',') components = np.array(components).astype(int) - 1 if 'diffmap' in basis or 'vmap' in basis: components += 1 return components def set_colorbar(ax, orientation='vertical'): cb = pl.colorbar(orientation=orientation, cax=inset_axes(ax, width="2%", height="30%", loc=4, borderpad=0)) cb.set_alpha(1) cb.draw_all() cb.locator = MaxNLocator(nbins=3, integer=True) cb.update_ticks() def default_arrow(size): if isinstance(size, (list, tuple)) and len(size) == 3: head_l, head_w, ax_l = size elif type(size) == int or type(size) == float: head_l, head_w, ax_l = 12 * size, 10 * size, 8 * size else: head_l, head_w, ax_l = 12, 10, 8 return head_l, head_w, ax_l def savefig_or_show(writekey, show=None, dpi=None, ext=None, save=None): if isinstance(save, str): # check whether `save` contains a figure extension if ext is None: for try_ext in ['.svg', '.pdf', '.png']: if save.endswith(try_ext): ext = try_ext[1:] save = save.replace(try_ext, '') break # append it writekey = 'velocity_' + (writekey + '_' if len(writekey) > 0 else '') + save save = True save = settings.autosave if save is None else save show = settings.autoshow if show is None else show if save: if dpi is None: # we need this as in notebooks, the internal figures are also influenced by 'savefig.dpi' this... if not isinstance(rcParams['savefig.dpi'], str) and rcParams['savefig.dpi'] < 150: if settings._low_resolution_warning: logg.warn( 'You are using a low resolution (dpi<150) for saving figures.\n' 'Consider running `set_figure_params(dpi_save=...)`, which will ' 'adjust `matplotlib.rcParams[\'savefig.dpi\']`') settings._low_resolution_warning = False else: dpi = rcParams['savefig.dpi'] if not os.path.exists(settings.figdir): os.makedirs(settings.figdir) if settings.figdir[-1] != '/': settings.figdir += '/' if ext is None: ext = settings.file_format_figs filename = settings.figdir + writekey + settings.plot_suffix + '.' + ext # output the following msg at warning level; it's really important for the user logg.msg('saving figure to file', filename, v=1) pl.savefig(filename, dpi=dpi, bbox_inches='tight') if show: pl.show() if save: pl.close() # clear figure def default_palette(palette=None): if palette is None: return rcParams['axes.prop_cycle'] elif not isinstance(palette, Cycler): return cycler(color=palette) else: return palette def adjust_palette(palette, length): islist = False if isinstance(palette, list): islist = True if ((islist and len(palette) < length) or (not isinstance(palette, list) and len(palette.by_key()['color']) < length)): if length <= 28: palette = palettes.default_26 elif length <= len(palettes.default_64): # 103 colors palette = palettes.default_64 else: palette = ['grey' for i in range(length)] logg.info('more than 103 colors would be required, initializing as \'grey\'') return palette if islist else cycler(color=palette) elif islist: return palette elif not isinstance(palette, Cycler): return cycler(color=palette) else: return palette def show_linear_fit(adata, basis, vkey, xkey, linewidth=1): xnew = np.linspace(0, np.percentile(make_dense(adata[:, basis].layers[xkey]), 98)) vkeys = adata.layers.keys() if vkey is None else make_unique_list(vkey) fits = [fit for fit in vkeys if all(['velocity' in fit, fit + '_gamma' in adata.var.keys()])] for i, fit in enumerate(fits): linestyle = '--' if 'variance_' + fit in adata.layers.keys() else '-' gamma = adata[:, basis].var[fit + '_gamma'].values if fit + '_gamma' in adata.var.keys() else 1 beta = adata[:, basis].var[fit + '_beta'].values if fit + '_beta' in adata.var.keys() else 1 offset = adata[:, basis].var[fit + '_offset'].values if fit + '_offset' in adata.var.keys() else 0 pl.plot(xnew, gamma / beta * xnew + offset / beta, linestyle=linestyle, linewidth=linewidth, c='k' if i == 0 else None) return fits def show_density(x, y, eval_pts=50, scale=10, alpha=.3): from scipy.stats import gaussian_kde as kde offset = max(y) / scale b_s = np.linspace(min(x), max(x), eval_pts) dvals_s = kde(x)(b_s) scale_s = offset / np.max(dvals_s) offset *= 1.3 # offset = - np.max(y) * 1.1 pl.plot(b_s, dvals_s * scale_s - offset, color='grey') pl.fill_between(b_s, -offset, dvals_s * scale_s - offset, alpha=alpha, color='grey') pl.ylim(-offset) offset = max(x) / scale b_u = np.linspace(min(y), max(y), eval_pts) dvals_u = kde(y)(b_u) scale_u = offset / np.max(dvals_u) offset *= 1.3 # offset = - np.max(x) * 1.1 pl.plot(dvals_u * scale_u - offset, b_u, color='grey') pl.fill_between(dvals_u * scale_u - offset, 0, b_u, alpha=alpha, color='grey') pl.xlim(-offset) def hist(arrays, alpha=.5, bins=None, colors=None, labels=None, xlabel=None, ylabel=None, ax=None, figsize=None, dpi=None, show=True): ax = pl.figure(None, figsize, dpi=dpi) if ax is None else ax arrays = arrays if isinstance(arrays, (list, tuple)) or arrays.ndim > 1 else [arrays] palette = default_palette(None).by_key()['color'][::-1] colors = palette if colors is None or len(colors) < len(arrays) else colors for i, array in enumerate(arrays): pl.hist(array[np.isfinite(array)], bins=bins, alpha=alpha, color=colors[i], label=labels[i] if labels is not None else None) pl.xlabel(xlabel if xlabel is not None else '') pl.ylabel(ylabel if xlabel is not None else '') if labels is not None: pl.legend() if not show: return ax else: pl.show() def plot(arrays, normalize=False, colors=None, labels=None, xlabel=None, ylabel=None, xscale=None, yscale=None, ax=None, figsize=None, dpi=None, show=True): ax = pl.figure(None, figsize, dpi=dpi) if ax is None else ax arrays = np.array(arrays) arrays = arrays if isinstance(arrays, (list, tuple)) or arrays.ndim > 1 else [arrays] palette = default_palette(None).by_key()['color'][::-1] colors = palette if colors is None or len(colors) < len(arrays) else colors for i, array in enumerate(arrays): X = array[np.isfinite(array)] X = X / np.max(X) if normalize else X pl.plot(X, color=colors[i], label=labels[i] if labels is not None else None) pl.xlabel(xlabel if xlabel is not None else '') pl.ylabel(ylabel if xlabel is not None else '') if labels is not None: pl.legend() if xscale is not None: pl.xscale(xscale) if yscale is not None: pl.yscale(yscale) if not show: return ax else: pl.show() def fraction_timeseries(adata, xkey='clusters', tkey='dpt_pseudotime', bins=30, legend_loc='best', title=None, fontsize=None, ax=None, figsize=None, dpi=None, xlabel=None, ylabel=None, show=True): t = np.linspace(0, 1 + 1 / bins, bins) types = np.unique(adata.obs[xkey].values) y = [] for i in range(bins - 1): mask = np.all([adata.obs[tkey].values <= t[i + 1], adata.obs[tkey].values > t[i]], axis=0) x = list(adata[mask].obs[xkey].values) y.append([]) for name in types: occur = x.count(name) y[-1].append(occur) y[-1] /= np.sum(y[-1]) y = np.array(y).T ax = pl.figure(figsize=figsize, dpi=dpi) if ax is None else ax pl.stackplot(t[:-1], y, baseline='zero', labels=types, colors=adata.uns['clusters_colors'] if 'clusters_colors' in adata.uns.keys() else None, edgecolor='white') pl.legend(types, loc=legend_loc) if title is not None: pl.title(title, fontsize=fontsize) pl.xlabel(tkey if xlabel is None else xlabel, fontsize=fontsize) pl.ylabel(xkey + ' fractions' if ylabel is None else ylabel, fontsize=fontsize) pl.xlim(adata.obs[tkey].values.min(), adata.obs[tkey].values.max()) pl.ylim(0, 1) if not show: return ax else: pl.show() # def phase(adata, var=None, x=None, y=None, color='louvain', fits='all', xlabel='spliced', ylabel='unspliced', # fontsize=None, show=True, ax=None, **kwargs): # if isinstance(var, str) and (var in adata.var_names): # if (x is None) or (y is None): # ix = np.where(adata.var_names == var)[0][0] # x, y = adata.layers['Ms'][:, ix], adata.layers['Mu'][:, ix] # else: # ValueError('var not found in adata.var_names.') # # ax = scatter(adata, x=x, y=y, color=color, frameon=True, title=var, xlabel=xlabel, ylabel=ylabel, ax=ax, **kwargs) # # xnew = np.linspace(0, x.max() * 1.02) # fits = adata.layers.keys() if fits == 'all' else fits # fits = [fit for fit in fits if 'velocity' in fit] # for fit in fits: # linestyle = '--' if 'stochastic' in fit else '-' # pl.plot(xnew, adata.var[fit+'_gamma'][ix] / adata.var[fit+'_beta'][ix] * xnew # + adata.var[fit+'_offset'][ix] / adata.var[fit+'_beta'][ix], c='k', linestyle=linestyle) # # if show: pl.show() # else: return axPKGUfNvݪ??scvelo/plotting/velocity.pyfrom ..preprocessing.moments import second_order_moments from ..tools.rank_velocity_genes import rank_velocity_genes from .scatter import scatter from .utils import savefig_or_show, default_basis, default_size import numpy as np import pandas as pd from matplotlib import rcParams from matplotlib.ticker import MaxNLocator import matplotlib.pyplot as pl from scipy.sparse import issparse def velocity(adata, var_names=None, basis=None, groupby=None, groups=None, mode=None, fits='all', layers='all', color=None, color_map='RdBu_r', colorbar=False, perc=[2,98], use_raw=False, size=None, alpha=.5, fontsize=None, figsize=None, dpi=None, show=True, save=None, ax=None, ncols=None, **kwargs): """Phase and velocity plot for set of genes. The phase plot shows spliced against unspliced expressions with steady-state fit. Further the embedding is shown colored by velocity and expression. Arguments --------- adata: :class:`~anndata.AnnData` Annotated data matrix. var_names: `str` or list of `str` (default: `None`) Which variables to show. basis: `str` (default: `'umap'`) Key for embedding coordinates. mode: `'stochastic'` or `None` (default: `None`) Whether to show show covariability phase portrait. fits: `str` or list of `str` (default: `'all'`) Which steady-state estimates to show. layers: `str` or list of `str` (default: `'all'`) Which layers to show. color: `str`, list of `str` or `None` (default: `None`) Key for annotations of observations/cells or variables/genes color_map: `str` (default: `matplotlib.rcParams['image.cmap']`) String denoting matplotlib color map. perc: tuple, e.g. [2,98] (default: `None`) Specify percentile for continuous coloring. size: `float` (default: 5) Point size. alpha: `float` (default: 1) Set blending - 0 transparent to 1 opaque. fontsize: `float` (default: `None`) Label font size. figsize: tuple (default: `(7,5)`) Figure size. dpi: `int` (default: 80) Figure dpi. show: `bool`, optional (default: `None`) Show the plot, do not return axis. save: `bool` or `str`, optional (default: `None`) If `True` or a `str`, save the figure. A string is appended to the default filename. Infer the filetype if ending on {'.pdf', '.png', '.svg'}. ax: `matplotlib.Axes`, optional (default: `None`) A matplotlib axes object. Only works if plotting a single component. """ basis = default_basis(adata) if basis is None else basis if isinstance(groupby, str) and groupby in adata.obs.keys(): if 'rank_velocity_genes' not in adata.uns.keys() or adata.uns['rank_velocity_genes']['params']['groupby'] != groupby: rank_velocity_genes(adata, vkey='velocity', n_genes=10, groupby=groupby) names = np.array(adata.uns['rank_velocity_genes']['names'].tolist()) if groups is None: var_names = names[:, 0] else: groups = [groups] if isinstance(groups, str) else groups idx = np.array([any([g in group for g in groups]) for group in adata.obs[groupby].cat.categories]) var_names = np.hstack(names[idx, :int(10 / idx.sum())]) elif var_names is not None: var_names = [var_names] if isinstance(var_names, str) else [var for var in var_names if var in adata.var_names] else: raise ValueError('No var_names or groups specified.') var_names = pd.unique(var_names) (skey, ukey) = ('spliced', 'unspliced') if use_raw else ('Ms', 'Mu') layers = ['velocity', skey, 'variance_velocity'] if layers == 'all' else layers layers = [layer for layer in layers if layer in adata.layers.keys()] fits = adata.layers.keys() if fits == 'all' else fits fits = [fit for fit in fits if all(['velocity' in fit, fit + '_gamma' in adata.var.keys()])] stochastic_fits = [fit for fit in fits if 'variance_' + fit in adata.layers.keys()] nplts = (1 + len(layers) + (mode == 'stochastic') * 2) ncols = 1 if ncols is None else ncols nrows = int(np.ceil(len(var_names) / ncols)) ncols = int(ncols * nplts) figsize = rcParams['figure.figsize'] if figsize is None else figsize ax = pl.figure(figsize=(figsize[0] * ncols / 2, figsize[1] * nrows / 2), dpi=dpi) if ax is None else ax gs = pl.GridSpec(nrows, ncols, wspace=0.3, hspace=0.5) size = default_size(adata) / 2 if size is None else size # since fontsize is halved in width and height fontsize = rcParams['font.size'] if fontsize is None else fontsize for v, var in enumerate(var_names): _adata = adata[:, var] s, u = _adata.layers[skey], _adata.layers[ukey] if issparse(s): s, u = s.A, u.A # spliced/unspliced phase portrait with steady-state estimate ax = pl.subplot(gs[v * nplts]) scatter(adata, basis=var, color=color, colorbar=colorbar, frameon=True, title=var, size=size, use_raw=use_raw, alpha=alpha, fontsize=fontsize, xlabel='spliced', ylabel='unspliced', show=False, ax=ax, save=False, legend_loc=None if v < len(var_names)-1 else 'lower right', **kwargs) # velocity and expression plots for l, layer in enumerate(layers): ax = pl.subplot(gs[v * nplts + l + 1]) title = 'expression' if layer == skey else layer scatter(adata, basis=basis, color=var, layer=layer, color_map=color_map, colorbar=colorbar, title=title, perc=perc, use_raw=use_raw, fontsize=fontsize, size=size, alpha=alpha, frameon=False, show=False, ax=ax, save=False, **kwargs) if mode == 'stochastic': ss, us = second_order_moments(_adata) ss, us = ss.flatten(), us.flatten() fit = stochastic_fits[0] ax = pl.subplot(gs[v * nplts + len(layers) + 1]) offset = _adata.var[fit + '_offset'] if fit + '_offset' in adata.var.keys() else 0 beta = _adata.var[fit + '_beta'] if fit + '_beta' in adata.var.keys() else 1 x = 2 * (ss - s**2) - s y = 2 * (us - u * s) + u + 2 * s * offset / beta scatter(adata, x=x, y=y, color=color, colorbar=colorbar, title=var, fontsize=40/ncols, size=size, perc=perc, xlabel=r'2 $\Sigma_s - \langle s \rangle$', ylabel=r'2 $\Sigma_{us} + \langle u \rangle$', use_raw=use_raw, frameon=True, ax=ax, save=False, show=False, **kwargs) xnew = np.linspace(x.min(), x.max() * 1.02) for fit in stochastic_fits: gamma = _adata.var[fit + '_gamma'].values if fit + '_gamma' in adata.var.keys() else 1 beta = _adata.var[fit + '_beta'].values if fit + '_beta' in adata.var.keys() else 1 offset2 = _adata.var[fit + '_offset2'].values if fit + '_offset2' in adata.var.keys() else 0 pl.plot(xnew, gamma / beta * xnew + offset2 / beta, c='k', linestyle='--') if v == len(var_names) - 1: pl.legend(fits, loc='lower right', prop={'size': 34/ncols}) savefig_or_show('', dpi=dpi, save=save, show=show) if not show: return ax PKtDNSЯ%scvelo/plotting/velocity_embedding.pyfrom ..tools.velocity_embedding import velocity_embedding as compute_velocity_embedding from ..tools.utils import groups_to_bool from .utils import interpret_colorkey, default_basis, default_size, get_components, savefig_or_show, default_color, \ default_arrow, make_unique_list from .scatter import scatter from .docs import doc_scatter, doc_params from matplotlib import rcParams from matplotlib.colors import is_color_like import matplotlib.pyplot as pl import numpy as np @doc_params(scatter=doc_scatter) def velocity_embedding(adata, basis=None, vkey='velocity', density=None, arrow_size=None, arrow_length=None, scale=None, X=None, V=None, color=None, use_raw=None, layer=None, color_map=None, colorbar=True, palette=None, size=None, alpha=.2, perc=None, sort_order=True, groups=None, components=None, projection='2d', legend_loc='none', legend_fontsize=None, legend_fontweight=None, right_margin=None, left_margin=None, xlabel=None, ylabel=None, title=None, fontsize=None, figsize=None, dpi=None, frameon=None, show=True, save=None, ax=None, ncols=None, **kwargs): """\ Scatter plot of velocities on the embedding. Arguments --------- adata: :class:`~anndata.AnnData` Annotated data matrix. x: `str`, `np.ndarray` or `None` (default: `None`) x coordinate y: `str`, `np.ndarray` or `None` (default: `None`) y coordinate vkey: `str` or `None` (default: `None`) Key for annotations of observations/cells or variables/genes. density: `float` (default: 1) Amount of velocities to show - 0 none to 1 all arrow_size: `float` or 3-tuple for headlength, headwidth and headaxislength (default: 1) Size of arrows. arrow_length: `float` (default: 1) Length of arrows. scale: `float` (default: 1) Length of velocities in the embedding. {scatter} Returns ------- `matplotlib.Axis` if `show==False` """ basis = default_basis(adata) if basis is None else basis colors, layers, vkeys = make_unique_list(color, allow_array=True), make_unique_list(layer), make_unique_list(vkey) for key in vkeys: if key + '_' + basis not in adata.obsm_keys() and V is None: compute_velocity_embedding(adata, basis=basis, vkey=key) scatter_kwargs = {"basis": basis, "perc": perc, "use_raw": use_raw, "sort_order": sort_order, "alpha": alpha, "components": components, "projection": projection, "legend_loc": legend_loc, "groups": groups, "legend_fontsize": legend_fontsize, "legend_fontweight": legend_fontweight, "palette": palette, "color_map": color_map, "frameon": frameon, "xlabel": xlabel, "ylabel": ylabel, "right_margin": right_margin, "left_margin": left_margin, "colorbar": colorbar, "dpi": dpi, "fontsize": fontsize, "show": False, "save": None} multikey = colors if len(colors) > 1 else layers if len(layers) > 1 else vkeys if len(vkeys) > 1 else None if multikey is not None: if isinstance(title, (list, tuple)): title *= int(np.ceil(len(multikey) / len(title))) ncols = len(multikey) if ncols is None else min(len(multikey), ncols) nrows = int(np.ceil(len(multikey) / ncols)) figsize = rcParams['figure.figsize'] if figsize is None else figsize for i, gs in enumerate( pl.GridSpec(nrows, ncols, pl.figure(None, (figsize[0] * ncols, figsize[1] * nrows), dpi=dpi))): if i < len(multikey): velocity_embedding(adata, density=density, scale=scale, size=size, ax=pl.subplot(gs), arrow_size=arrow_size, arrow_length=arrow_length, color=colors[i] if len(colors) > 1 else color, layer=layers[i] if len(layers) > 1 else layer, vkey=vkeys[i] if len(vkeys) > 1 else vkey, title=title[i] if isinstance(title, (list, tuple)) else title, **scatter_kwargs, **kwargs) savefig_or_show('' if basis is None else basis, dpi=dpi, save=save, show=show) if not show: return ax else: if projection == '3d': from mpl_toolkits.mplot3d import Axes3D ax = pl.figure(None, figsize, dpi=dpi).gca(projection=projection) if ax is None else ax else: ax = pl.figure(None, figsize, dpi=dpi).gca() if ax is None else ax color, layer, vkey = colors[0], layers[0], vkeys[0] color = default_color(adata) if color is None else color size = default_size(adata) / 2 if size is None else size _adata = adata[groups_to_bool(adata, groups, groupby=color)] if groups is not None and color in adata.obs.keys() else adata density = 1 if density is None or density > 1 else density ix_choice = np.random.choice(_adata.n_obs, size=int(density * _adata.n_obs), replace=False) x, y = None if X is None else X[:, 0], None if X is None else X[:, 1] X = _adata.obsm['X_' + basis][:, get_components(components, basis, projection)][ix_choice] if X is None else X[:, :2][ix_choice] V = _adata.obsm[vkey + '_' + basis][:, get_components(components, basis, projection)][ix_choice] if V is None else V[:, :2][ix_choice] hl, hw, hal = default_arrow(arrow_size) scale = 1 / arrow_length if arrow_length is not None else scale if scale is not None else 1 quiver_kwargs = {"scale": scale, "cmap": color_map, "angles": 'xy', "scale_units": 'xy', "width": .0005, "edgecolors": 'k', "headlength": hl, "headwidth": hw, "headaxislength": hal, "linewidth": .1} quiver_kwargs.update(kwargs) c = interpret_colorkey(_adata, color, layer, perc) c = c[ix_choice] if len(c) == _adata.n_obs else c if projection == '3d' and X.shape[1] > 2 and V.shape[1] > 2: V, size = V / scale / 5, size / 10 x0, x1, x2, v0, v1, v2 = X[:, 0], X[:, 1], X[:, 2], V[:, 0], V[:, 1], V[:, 2] quiver3d_kwargs = {"zorder": 3, "linewidth": .5, "arrow_length_ratio": .3} c = list(c) + [element for element in list(c) for _ in range(2)] if is_color_like(c[0]): pl.quiver(x0, x1, x2, v0, v1, v2, color=c, **quiver3d_kwargs) else: pl.quiver(x0, x1, x2, v0, v1, v2, c, **quiver3d_kwargs) else: if is_color_like(c[0]): pl.quiver(X[:, 0], X[:, 1], V[:, 0], V[:, 1], color=c, zorder=3, **quiver_kwargs) else: pl.quiver(X[:, 0], X[:, 1], V[:, 0], V[:, 1], c, zorder=3, **quiver_kwargs) ax = scatter(adata, x=x, y=y, layer=layer, color=color, size=size, title=title, ax=ax, zorder=0, **scatter_kwargs) savefig_or_show('' if basis is None else basis, dpi=dpi, save=save, show=show) if not show: return ax PKtDN:-#-#*scvelo/plotting/velocity_embedding_grid.pyfrom ..tools.velocity_embedding import quiver_autoscale, velocity_embedding from ..tools.utils import groups_to_bool from .utils import default_basis, default_size, default_color, get_components, savefig_or_show, default_arrow, make_unique_list from .scatter import scatter from .docs import doc_scatter, doc_params from sklearn.neighbors import NearestNeighbors from scipy.stats import norm as normal from matplotlib import rcParams import matplotlib.pyplot as pl import numpy as np def compute_velocity_on_grid(X_emb, V_emb, density=None, smooth=None, n_neighbors=None, min_mass=None, autoscale=True, adjust_for_stream=False): # prepare grid n_obs, n_dim = X_emb.shape density = 1 if density is None else density smooth = .5 if smooth is None else smooth grs = [] for dim_i in range(n_dim): m, M = np.min(X_emb[:, dim_i]), np.max(X_emb[:, dim_i]) m = m - .01 * np.abs(M - m) M = M + .01 * np.abs(M - m) gr = np.linspace(m, M, 50 * density) grs.append(gr) meshes_tuple = np.meshgrid(*grs) X_grid = np.vstack([i.flat for i in meshes_tuple]).T # estimate grid velocities if n_neighbors is None: n_neighbors = int(n_obs/50) nn = NearestNeighbors(n_neighbors=n_neighbors, n_jobs=-1) nn.fit(X_emb) dists, neighs = nn.kneighbors(X_grid) scale = np.mean([(g[1] - g[0]) for g in grs]) * smooth weight = normal.pdf(x=dists, scale=scale) p_mass = weight.sum(1) V_grid = (V_emb[neighs] * weight[:, :, None]).sum(1) / np.maximum(1, p_mass)[:, None] if adjust_for_stream: X_grid = np.stack([np.unique(X_grid[:, 0]), np.unique(X_grid[:, 1])]) ns = int(np.sqrt(len(V_grid[:, 0]))) V_grid = V_grid.T.reshape(2, ns, ns) mass = np.sqrt((V_grid ** 2).sum(0)) V_grid[0][mass.reshape(V_grid[0].shape) < 1e-5] = np.nan else: if min_mass is None: min_mass = np.clip(np.percentile(p_mass, 99) / 100, 1e-2, 1) X_grid, V_grid = X_grid[p_mass > min_mass], V_grid[p_mass > min_mass] if autoscale: V_grid /= 3 * quiver_autoscale(X_grid, V_grid) return X_grid, V_grid @doc_params(scatter=doc_scatter) def velocity_embedding_grid(adata, basis=None, vkey='velocity', density=None, smooth=None, min_mass=None, arrow_size=None, arrow_length=None, arrow_color=None, scale=None, autoscale=True, n_neighbors=None, X=None, V=None, X_grid=None, V_grid=None, principal_curve=False, color=None, use_raw=None, layer=None, color_map=None, colorbar=True, palette=None, size=None, alpha=.2, perc=None, sort_order=True, groups=None, components=None, projection='2d', legend_loc='none', legend_fontsize=None, legend_fontweight=None, right_margin=None, left_margin=None, xlabel=None, ylabel=None, title=None, fontsize=None, figsize=None, dpi=None, frameon=None, show=True, save=None, ax=None, ncols=None, **kwargs): """\ Scatter plot of velocities on a grid. Arguments --------- adata: :class:`~anndata.AnnData` Annotated data matrix. x: `str`, `np.ndarray` or `None` (default: `None`) x coordinate y: `str`, `np.ndarray` or `None` (default: `None`) y coordinate vkey: `str` or `None` (default: `None`) Key for annotations of observations/cells or variables/genes. density: `float` (default: 1) Amount of velocities to show - 0 none to 1 all arrow_size: `float` or 3-tuple for headlength, headwidth and headaxislength (default: 1) Size of arrows. arrow_length: `float` (default: 1) Length of arrows. scale: `float` (default: 1) Length of velocities in the embedding. min_mass: `float` (default: 0.5) Minimum threshold for mass to be shown. smooth: `float` (default: 0.5) Multiplication factor for scale in Gaussian kernel around grid point. n_neighbors: `int` (default: None) Number of neighbors to consider around grid point. X: `np.ndarray` (default: None) embedding grid point coordinates V: `np.ndarray` (default: None) embedding grid velocity coordinates {scatter} Returns ------- `matplotlib.Axis` if `show==False` """ basis = default_basis(adata) if basis is None else basis colors, layers, vkeys = make_unique_list(color, allow_array=True), make_unique_list(layer), make_unique_list(vkey) for key in vkeys: if key + '_' + basis not in adata.obsm_keys() and V is None: velocity_embedding(adata, basis=basis, vkey=key) color, layer, vkey = colors[0], layers[0], vkeys[0] color = default_color(adata) if color is None else color if X_grid is None or V_grid is None: _adata = adata[groups_to_bool(adata, groups, groupby=color)] \ if groups is not None and color in adata.obs.keys() else adata X_emb = _adata.obsm['X_' + basis][:, get_components(components, basis)] if X is None else X[:, :2] V_emb = _adata.obsm[vkey + '_' + basis][:, get_components(components, basis)] if V is None else V[:, :2] X_grid, V_grid = compute_velocity_on_grid(X_emb=X_emb, V_emb=V_emb, density=density, autoscale=autoscale, smooth=smooth, n_neighbors=n_neighbors, min_mass=min_mass) scatter_kwargs = {"basis": basis, "perc": perc, "use_raw": use_raw, "sort_order": sort_order, "alpha": alpha, "components": components, "projection": projection, "legend_loc": legend_loc, "groups": groups, "legend_fontsize": legend_fontsize, "legend_fontweight": legend_fontweight, "palette": palette, "color_map": color_map, "frameon": frameon, "xlabel": xlabel, "ylabel": ylabel, "right_margin": right_margin, "left_margin": left_margin, "colorbar": colorbar, "dpi": dpi, "fontsize": fontsize, "show": False, "save": None} multikey = colors if len(colors) > 1 else layers if len(layers) > 1 else vkeys if len(vkeys) > 1 else None if multikey is not None: if isinstance(title, (list, tuple)): title *= int(np.ceil(len(multikey) / len(title))) ncols = len(multikey) if ncols is None else min(len(multikey), ncols) nrows = int(np.ceil(len(multikey) / ncols)) figsize = rcParams['figure.figsize'] if figsize is None else figsize for i, gs in enumerate( pl.GridSpec(nrows, ncols, pl.figure(None, (figsize[0] * ncols, figsize[1] * nrows), dpi=dpi))): if i < len(multikey): velocity_embedding_grid(adata, density=density, scale=scale, size=size, min_mass=min_mass, smooth=smooth, n_neighbors=n_neighbors, principal_curve=principal_curve, ax=pl.subplot(gs), arrow_size=arrow_size, arrow_length=arrow_length, color=colors[i] if len(colors) > 1 else color, layer=layers[i] if len(layers) > 1 else layer, vkey=vkeys[i] if len(vkeys) > 1 else vkey, title=title[i] if isinstance(title, (list, tuple)) else title, X_grid=None if len(vkeys) > 1 else X_grid, V_grid=None if len(vkeys) > 1 else V_grid, autoscale=False if len(vkeys) > 1 else autoscale, **scatter_kwargs, **kwargs) savefig_or_show('' if basis is None else basis, dpi=dpi, save=save, show=show) if not show: return ax else: ax = pl.figure(None, figsize, dpi=dpi).gca() if ax is None else ax hl, hw, hal = default_arrow(arrow_size) scale = 1 / arrow_length if arrow_length is not None else scale if scale is not None else 1 quiver_kwargs = {"scale": scale, "angles": 'xy', "scale_units": 'xy', "width": .001, "color": 'grey' if arrow_color is None else arrow_color, "edgecolors": 'k', "headlength": hl/2, "headwidth": hw/2, "headaxislength": hal/2, "linewidth": .2} quiver_kwargs.update(kwargs) pl.quiver(X_grid[:, 0], X_grid[:, 1], V_grid[:, 0], V_grid[:, 1], **quiver_kwargs, zorder=3) if principal_curve: curve = adata.uns['principal_curve']['projections'] pl.plot(curve[:, 0], curve[:, 1], c="w", lw=6, zorder=4) pl.plot(curve[:, 0], curve[:, 1], c="k", lw=3, zorder=5) size = 4 * default_size(adata) if size is None else size ax = scatter(adata, layer=layer, color=color, size=size, title=title, ax=ax, zorder=0, **scatter_kwargs) savefig_or_show('' if basis is None else basis, dpi=dpi, save=save, show=show) if not show: return ax PKuDNe,scvelo/plotting/velocity_embedding_stream.pyfrom ..tools.velocity_embedding import velocity_embedding from ..tools.utils import groups_to_bool from .utils import default_basis, default_size, default_color, get_components, savefig_or_show, make_unique_list from .velocity_embedding_grid import compute_velocity_on_grid from .scatter import scatter from .docs import doc_scatter, doc_params from matplotlib import rcParams import matplotlib.pyplot as pl import numpy as np @doc_params(scatter=doc_scatter) def velocity_embedding_stream(adata, basis=None, vkey='velocity', density=None, smooth=None, linewidth=None, n_neighbors=None, X=None, V=None, X_grid=None, V_grid=None, color=None, use_raw=None, layer=None, color_map=None, colorbar=True, palette=None, size=None, alpha=.1, perc=None, sort_order=True, groups=None, components=None, legend_loc='on data', legend_fontsize=None, legend_fontweight=None, right_margin=None, left_margin=None, xlabel=None, ylabel=None, title=None, fontsize=None, figsize=None, dpi=None, frameon=None, show=True, save=None, ax=None, ncols=None, **kwargs): """\ Stream plot of velocities on the embedding. Arguments --------- adata: :class:`~anndata.AnnData` Annotated data matrix. x: `str`, `np.ndarray` or `None` (default: `None`) x coordinate y: `str`, `np.ndarray` or `None` (default: `None`) y coordinate vkey: `str` or `None` (default: `None`) Key for annotations of observations/cells or variables/genes. density: `float` (default: 1) Amount of velocities to show - 0 none to 1 all smooth: `float` (default: 0.5) Multiplication factor for scale in Gaussian kernel around grid point. linewidth: `float` (default: 1) Line width for streamplot. n_neighbors: `int` (default: None) Number of neighbors to consider around grid point. X: `np.ndarray` (default: None) Embedding grid point coordinates V: `np.ndarray` (default: None) Embedding grid velocity coordinates {scatter} Returns ------- `matplotlib.Axis` if `show==False` """ basis = default_basis(adata) if basis is None else basis colors, layers, vkeys = make_unique_list(color, allow_array=True), make_unique_list(layer), make_unique_list(vkey) for key in vkeys: if key + '_' + basis not in adata.obsm_keys() and V is None: velocity_embedding(adata, basis=basis, vkey=key) color, layer, vkey = colors[0], layers[0], vkeys[0] color = default_color(adata) if color is None else color if X_grid is None or V_grid is None: _adata = adata[groups_to_bool(adata, groups, groupby=color)] \ if groups is not None and color in adata.obs.keys() else adata X_emb = _adata.obsm['X_' + basis][:, get_components(components, basis)] if X is None else X[:, :2] V_emb = _adata.obsm[vkey + '_' + basis][:, get_components(components, basis)] if V is None else V[:, :2] X_grid, V_grid = compute_velocity_on_grid(X_emb=X_emb, V_emb=V_emb, density=1, smooth=smooth, n_neighbors=n_neighbors, autoscale=False, adjust_for_stream=True) lengths = np.sqrt((V_grid ** 2).sum(0)) linewidth = 1 if linewidth is None else linewidth linewidth *= 2 * lengths / lengths[~np.isnan(lengths)].max() scatter_kwargs = {"basis": basis, "perc": perc, "use_raw": use_raw, "sort_order": sort_order, "alpha": alpha, "components": components, "legend_loc": legend_loc, "groups": groups, "legend_fontsize": legend_fontsize, "legend_fontweight": legend_fontweight, "palette": palette, "color_map": color_map, "frameon": frameon, "xlabel": xlabel, "ylabel": ylabel, "right_margin": right_margin, "left_margin": left_margin, "colorbar": colorbar, "dpi": dpi, "fontsize": fontsize, "show": False, "save": None} multikey = colors if len(colors) > 1 else layers if len(layers) > 1 else vkeys if len(vkeys) > 1 else None if multikey is not None: if isinstance(title, (list, tuple)): title *= int(np.ceil(len(multikey) / len(title))) ncols = len(multikey) if ncols is None else min(len(multikey), ncols) nrows = int(np.ceil(len(multikey) / ncols)) figsize = rcParams['figure.figsize'] if figsize is None else figsize for i, gs in enumerate( pl.GridSpec(nrows, ncols, pl.figure(None, (figsize[0] * ncols, figsize[1] * nrows), dpi=dpi))): if i < len(multikey): velocity_embedding_stream(adata, density=density, size=size, smooth=smooth, n_neighbors=n_neighbors, linewidth=linewidth, ax=pl.subplot(gs), color=colors[i] if len(colors) > 1 else color, layer=layers[i] if len(layers) > 1 else layer, vkey=vkeys[i] if len(vkeys) > 1 else vkey, title=title[i] if isinstance(title, (list, tuple)) else title, X_grid=None if len(vkeys) > 1 else X_grid, V_grid=None if len(vkeys) > 1 else V_grid, **scatter_kwargs, **kwargs) savefig_or_show('' if basis is None else basis, dpi=dpi, save=save, show=show) if not show: return ax else: ax = pl.figure(None, figsize, dpi=dpi).gca() if ax is None else ax density = 1 if density is None else density stream_kwargs = {"linewidth": linewidth, "density": 2 * density} stream_kwargs.update(kwargs) pl.streamplot(X_grid[0], X_grid[1], V_grid[0], V_grid[1], color='grey', zorder=3, **stream_kwargs) size = 4 * default_size(adata) if size is None else size ax = scatter(adata, layer=layer, color=color, size=size, title=title, ax=ax, zorder=0, **scatter_kwargs) savefig_or_show('' if basis is None else basis, dpi=dpi, save=save, show=show) if not show: return ax PK$k4NlZkP P !scvelo/plotting/velocity_graph.pyfrom .. import settings from ..tools.transition_matrix import transition_matrix from .utils import savefig_or_show, default_basis from .scatter import scatter from .docs import doc_scatter, doc_params import warnings import numpy as np import matplotlib.pyplot as pl @doc_params(scatter=doc_scatter) def velocity_graph(adata, basis=None, vkey='velocity', which_graph='velocity', n_neighbors=10, alpha=.8, perc=90, edge_width=.2, edge_color='grey', color=None, use_raw=None, layer=None, color_map=None, colorbar=True, palette=None, size=None, sort_order=True, groups=None, components=None, projection='2d', legend_loc='on data', legend_fontsize=None, legend_fontweight=None, right_margin=None, left_margin=None, xlabel=None, ylabel=None, title=None, fontsize=None, figsize=None, dpi=None, frameon=None, show=True, save=None, ax=None): """\ Plot of the velocity graph. Arguments --------- adata: :class:`~anndata.AnnData` Annotated data matrix. vkey: `str` or `None` (default: `None`) Key for annotations of observations/cells or variables/genes. which_graph: `'velocity'` or `'neighbors'` (default: `'velocity'`) Whether to show transitions from velocity graph or connectivities from neighbors graph. {scatter} Returns ------- `matplotlib.Axis` if `show==False` """ basis = default_basis(adata) if basis is None else basis title = which_graph + ' graph' if title is None else title scatter_kwargs = {"basis": basis, "perc": perc, "use_raw": use_raw, "sort_order": sort_order, "alpha": alpha, "components": components, "projection": projection, "legend_loc": legend_loc, "groups": groups, "legend_fontsize": legend_fontsize, "legend_fontweight": legend_fontweight, "palette": palette, "color_map": color_map, "frameon": frameon, "title": title, "xlabel": xlabel, "ylabel": ylabel, "right_margin": right_margin, "left_margin": left_margin, "colorbar": colorbar, "dpi": dpi, "fontsize": fontsize, "show": False, "save": None, "figsize": figsize, } ax = scatter(adata, layer=layer, color=color, size=size, ax=ax, zorder=0, **scatter_kwargs) from networkx import Graph, draw_networkx_edges if which_graph == 'neighbors': T = adata.uns['neighbors']['connectivities'] if perc is not None: threshold = np.percentile(T.data, perc) T.data[T.data < threshold] = 0 T.eliminate_zeros() else: T = transition_matrix(adata, vkey=vkey, weight_indirect_neighbors=0, n_neighbors=n_neighbors, perc=perc) with warnings.catch_warnings(): warnings.simplefilter("ignore") edges = draw_networkx_edges(Graph(T), adata.obsm['X_' + basis], width=edge_width, edge_color=edge_color, ax=ax) edges.set_zorder(-2) edges.set_rasterized(settings._vector_friendly) savefig_or_show('' if basis is None else basis, dpi=dpi, save=save, show=show) if not show: return ax PKÀ/N scvelo/preprocessing/__init__.pyfrom .utils import show_proportions, cleanup, filter_genes, filter_genes_dispersion, normalize_per_cell, log1p, \ filter_and_normalize, recipe_velocity from .neighbors import pca, neighbors from .moments import moments PK1N=scvelo/preprocessing/moments.pyfrom .. import settings from .. import logging as logg from .utils import not_yet_normalized, normalize_per_cell from .neighbors import neighbors, get_connectivities, neighbors_to_be_recomputed from scipy.sparse import csr_matrix import numpy as np def moments(data, n_neighbors=30, n_pcs=30, mode='connectivities', method='umap', metric='euclidean', use_rep=None, recurse_neighbors=False, renormalize=False, copy=False): """Computes moments for velocity estimation. Arguments --------- data: :class:`~anndata.AnnData` Annotated data matrix. n_neighbors: `int` (default: 30) Number of neighbors to use. n_pcs: `int` (default: 30) Number of principal components to use. mode: `'connectivities'` or `'distances'` (default: `'connectivities'`) Distance metric to use for moment computation. renormalize: `bool` (default: `False`) Renormalize the moments by total counts per cell to its median. copy: `bool` (default: `False`) Return a copy instead of writing to adata. Returns ------- Returns or updates `adata` with the attributes Ms: `.layers` dense matrix with first order moments of spliced counts. Mu: `.layers` dense matrix with first order moments of unspliced counts. """ adata = data.copy() if copy else data if 'spliced' not in adata.layers.keys() or 'unspliced' not in adata.layers.keys(): raise ValueError('Could not find spliced / unspliced counts.') if any([not_yet_normalized(adata.layers[layer]) for layer in {'spliced', 'unspliced'}]): normalize_per_cell(adata) if 'neighbors' not in adata.uns.keys() or neighbors_to_be_recomputed(adata, n_neighbors=n_neighbors): if use_rep is None: use_rep = 'X_pca' neighbors(adata, n_neighbors=n_neighbors, use_rep=use_rep, n_pcs=n_pcs, method=method, metric=metric) if mode not in adata.uns['neighbors']: raise ValueError('mode can only be \'connectivities\' or \'distances\'') logg.info('computing moments based on ' + str(mode), r=True) connectivities = get_connectivities(adata, mode, n_neighbors=n_neighbors, recurse_neighbors=recurse_neighbors) adata.layers['Ms'] = csr_matrix.dot(connectivities, csr_matrix(adata.layers['spliced'])).astype(np.float32).A adata.layers['Mu'] = csr_matrix.dot(connectivities, csr_matrix(adata.layers['unspliced'])).astype(np.float32).A if renormalize: normalize_per_cell(adata, layers={'Ms', 'Mu'}, enforce=True) logg.info(' finished', time=True, end=' ' if settings.verbosity > 2 else '\n') logg.hint( 'added \n' ' \'Ms\' and \'Mu\', moments of spliced/unspliced abundances (adata.layers)') return adata if copy else None def second_order_moments(adata, adjusted=False): """Computes second order moments for stochastic velocity estimation. Arguments --------- adata: `AnnData` Annotated data matrix. Returns ------- Mss: Second order moments for spliced abundances Mus: Second order moments for spliced with unspliced abundances """ if 'neighbors' not in adata.uns: raise ValueError('You need to run `pp.neighbors` first to compute a neighborhood graph.') connectivities = get_connectivities(adata) s, u = csr_matrix(adata.layers['spliced']), csr_matrix(adata.layers['unspliced']) Mss = csr_matrix.dot(connectivities, s.multiply(s)).astype(np.float32).A Mus = csr_matrix.dot(connectivities, s.multiply(u)).astype(np.float32).A if adjusted: Mss = 2 * Mss - adata.layers['Ms'].reshape(Mss.shape) Mus = 2 * Mus - adata.layers['Mu'].reshape(Mus.shape) return Mss, Mus def second_order_moments_u(adata): """Computes second order moments for stochastic velocity estimation. Arguments --------- adata: `AnnData` Annotated data matrix. Returns ------- Muu: Second order moments for unspliced abundances """ if 'neighbors' not in adata.uns: raise ValueError('You need to run `pp.neighbors` first to compute a neighborhood graph.') connectivities = get_connectivities(adata) u = csr_matrix(adata.layers['unspliced']) Muu = csr_matrix.dot(connectivities, u.multiply(u)).astype(np.float32).A return Muu PKXN'\!scvelo/preprocessing/neighbors.pyfrom .. import settings from .. import logging as logg from scanpy.api import Neighbors from scanpy.api.pp import pca from scipy.sparse import issparse import numpy as np def neighbors(adata, n_neighbors=30, n_pcs=30, use_rep=None, knn=True, random_state=0, method='umap', metric='euclidean', metric_kwds={}, copy=False): """ Compute a neighborhood graph of observations [McInnes18]_. The neighbor search efficiency of this heavily relies on UMAP [McInnes18]_, which also provides a method for estimating connectivities of data points - the connectivity of the manifold (`method=='umap'`). If `method=='diffmap'`, connectivities are computed according to [Coifman05]_, in the adaption of [Haghverdi16]_. Parameters ---------- adata Annotated data matrix. n_neighbors The size of local neighborhood (in terms of number of neighboring data points) used for manifold approximation. Larger values result in more global views of the manifold, while smaller values result in more local data being preserved. In general values should be in the range 2 to 100. If `knn` is `True`, number of nearest neighbors to be searched. If `knn` is `False`, a Gaussian kernel width is set to the distance of the `n_neighbors` neighbor. n_pcs : `int` or `None` (default: None) Use this many PCs. If n_pcs==0 use .X if use_rep is None. use_rep : `None`, `'X'` or any key for `.obsm` (default: None) Use the indicated representation. If `None`, the representation is chosen automatically: for .n_vars < 50, .X is used, otherwise ‘X_pca’ is used. knn If `True`, use a hard threshold to restrict the number of neighbors to `n_neighbors`, that is, consider a knn graph. Otherwise, use a Gaussian Kernel to assign low weights to neighbors more distant than the `n_neighbors` nearest neighbor. random_state A numpy random seed. method : {{'umap', 'gauss', `sklearn`, `None`}} (default: `'umap'`) Use 'umap' [McInnes18]_ or 'gauss' (Gauss kernel following [Coifman05]_ with adaptive width [Haghverdi16]_) for computing connectivities. metric A known metric’s name or a callable that returns a distance. metric_kwds Options for the metric. copy Return a copy instead of writing to adata. Returns ------- Depending on `copy`, updates or returns `adata` with the following: connectivities : sparse matrix (`.uns['neighbors']`, dtype `float32`) Weighted adjacency matrix of the neighborhood graph of data points. Weights should be interpreted as connectivities. distances : sparse matrix (`.uns['neighbors']`, dtype `float32`) Instead of decaying weights, this stores distances for each pair of neighbors. """ logg.info('computing neighbors', r=True) adata = adata.copy() if copy else adata if adata.isview: adata._init_as_actual(adata.copy()) if (use_rep is None or use_rep is 'X_pca') \ and ('X_pca' not in adata.obsm.keys() or n_pcs > adata.obsm['X_pca'].shape[1]): pca(adata, n_comps=n_pcs, svd_solver='arpack') if method is 'sklearn': from sklearn.neighbors import NearestNeighbors neighbors = NearestNeighbors(n_neighbors=n_neighbors) neighbors.fit(adata.obsm['X_pca'] if use_rep is None else adata.obsm[use_rep]) neighbors.distances = neighbors.kneighbors_graph(mode='distance') neighbors.connectivities = neighbors.kneighbors_graph(mode='connectivity') else: neighbors = Neighbors(adata) neighbors.compute_neighbors(n_neighbors=n_neighbors, knn=knn, n_pcs=n_pcs, use_rep=use_rep, method=method, metric=metric, metric_kwds=metric_kwds, random_state=random_state, write_knn_indices=True) adata.uns['neighbors'] = {} adata.uns['neighbors']['params'] = {'n_neighbors': n_neighbors, 'method': method} adata.uns['neighbors']['distances'] = neighbors.distances adata.uns['neighbors']['connectivities'] = neighbors.connectivities if hasattr(neighbors, 'knn_indices'): adata.uns['neighbors']['indices'] = neighbors.knn_indices logg.info(' finished', time=True, end=' ' if settings.verbosity > 2 else '\n') logg.hint( 'added to `.uns[\'neighbors\']`\n' ' \'distances\', weighted adjacency matrix\n' ' \'connectivities\', weighted adjacency matrix') return adata if copy else None def select_distances(dist, n_neighbors=None): D = dist.copy() n_counts = (D > 0).sum(1).A1 if issparse(D) else (D > 0).sum(1) n_neighbors = n_counts.min() if n_neighbors is None else min(n_counts.min(), n_neighbors) rows = np.where(n_counts > n_neighbors)[0] cumsum_neighs = np.insert(n_counts.cumsum(), 0, 0) dat = D.data for row in rows: n0, n1 = cumsum_neighs[row], cumsum_neighs[row + 1] rm_idx = n0 + dat[n0:n1].argsort()[n_neighbors:] dat[rm_idx] = 0 D.eliminate_zeros() return D def select_connectivities(connectivities, n_neighbors=None): C = connectivities.copy() n_counts = (C > 0).sum(1).A1 if issparse(C) else (C > 0).sum(1) n_neighbors = n_counts.min() if n_neighbors is None else min(n_counts.min(), n_neighbors) rows = np.where(n_counts > n_neighbors)[0] cumsum_neighs = np.insert(n_counts.cumsum(), 0, 0) dat = C.data for row in rows: n0, n1 = cumsum_neighs[row], cumsum_neighs[row + 1] rm_idx = n0 + dat[n0:n1].argsort()[::-1][n_neighbors:] dat[rm_idx] = 0 C.eliminate_zeros() return C def neighbors_to_be_recomputed(adata, n_neighbors=None): # check if neighbors graph is disrupted n_neighs = (adata.uns['neighbors']['distances'] > 0).sum(1) result = n_neighs.max() - n_neighs.min() >= 2 # check if neighbors graph has sufficient number of neighbors if n_neighbors is not None: result = result or n_neighbors > adata.uns['neighbors']['params']['n_neighbors'] return result def get_connectivities(adata, mode='connectivities', n_neighbors=None, recurse_neighbors=False): C = adata.uns['neighbors'][mode] if n_neighbors is not None and n_neighbors < adata.uns['neighbors']['params']['n_neighbors']: C = select_connectivities(C, n_neighbors) if mode == 'connectivities' else select_distances(C, n_neighbors) connectivities = C > 0 connectivities.setdiag(1) if recurse_neighbors: connectivities += connectivities.dot(connectivities * .5) connectivities.data = np.clip(connectivities.data, 0, 1) connectivities = connectivities.multiply(1. / connectivities.sum(1)) return connectivities.tocsr().astype(np.float32)PKyNeU1M1Mscvelo/preprocessing/utils.pyfrom .. import logging as logg import numpy as np from scipy.sparse import issparse from sklearn.utils import sparsefuncs from anndata import AnnData def show_proportions(adata): """Fraction of spliced/unspliced/ambiguous abundances Arguments --------- adata: :class:`~anndata.AnnData` Annotated data matrix. Returns ------- Prints the fractions of abundances. """ layers_keys = [key for key in ['spliced', 'unspliced', 'ambiguous'] if key in adata.layers.keys()] tot_mol_cell_layers = [adata.layers[key].sum(1) for key in layers_keys] mean_abundances = np.round( [np.mean(tot_mol_cell / np.sum(tot_mol_cell_layers, 0)) for tot_mol_cell in tot_mol_cell_layers], 2) print('Abundance of ' + str(layers_keys) + ': ' + str(mean_abundances)) def cleanup(data, clean='layers', keep=None, copy=False): """Deletes attributes not needed. Arguments --------- data: :class:`~anndata.AnnData` Annotated data matrix. clean: `str` or list of `str` (default: `layers`) Which attributes to consider for freeing memory. keep: `str` or list of `str` (default: None) Which attributes to keep. copy: `bool` (default: `False`) Return a copy instead of writing to adata. Returns ------- Returns or updates `adata` with selection of attributes kept. """ adata = data.copy() if copy else data keep = list([keep]) if isinstance(keep, str) else list() if keep is None else list(keep) keep.extend(['spliced', 'unspliced', 'Ms', 'Mu', 'clusters', 'neighbors']) ann_dict = {'obs': adata.obs_keys(), 'var': adata.var_keys(), 'uns': adata.uns_keys(), 'layers': list(adata.layers.keys())} if 'all' not in clean: ann_dict = {ann: values for (ann, values) in ann_dict.items() if ann in clean} for (ann, values) in ann_dict.items(): for value in values: if value not in keep: del(getattr(adata, ann)[value]) return adata if copy else None def get_size(adata, layer=None): X = adata.X if layer is None else adata.layers[layer] return X.sum(1).A1 if issparse(X) else X.sum(1) def set_initial_size(adata, layers={'spliced', 'unspliced'}): layers = [layer for layer in layers if layer in adata.layers.keys() and 'initial_size_' + layer not in adata.obs.keys()] for layer in layers: adata.obs['initial_size_' + layer] = get_size(adata, layer) if 'initial_size' not in adata.obs.keys(): adata.obs['initial_size'] = get_size(adata) def get_initial_size(adata, layer=None, by_total_size=None): if by_total_size: return np.sum([adata.obs['initial_size_' + layer] for layer in {'spliced', 'unspliced'} if 'initial_size_' + layer in adata.obs.keys()], axis=0) elif layer in adata.layers.keys(): return np.array(adata.obs['initial_size_' + layer]) if 'initial_size_' + layer in adata.obs.keys() else get_size(adata, layer) elif layer is None or layer is 'X': return np.array(adata.obs['initial_size']) if 'initial_size' in adata.obs.keys() else get_size(adata) else: return None def filter(X, min_counts=None, min_cells=None, max_counts=None, max_cells=None): counts = X if (min_counts is not None or max_counts is not None) else X > 0 counts = counts.sum(0).A1 if issparse(counts) else counts.sum(0) lb = min_counts if min_counts is not None else min_cells if min_cells is not None else -np.inf ub = max_counts if max_counts is not None else max_cells if max_cells is not None else np.inf return (lb <= counts) & (counts <= ub), counts def filter_genes(data, min_counts=None, min_cells=None, max_counts=None, max_cells=None, min_counts_u=None, min_cells_u=None, max_counts_u=None, max_cells_u=None, min_shared_counts=None, min_shared_cells=None, copy=False): """Filter genes based on number of cells or counts. Keep genes that have at least `min_counts` counts or are expressed in at least `min_cells` cells or have at most `max_counts` counts or are expressed in at most `max_cells` cells. Only provide one of the optional parameters `min_counts`, `min_cells`, `max_counts`, `max_cells` per call. Parameters ---------- data : :class:`~anndata.AnnData`, `np.ndarray`, `sp.spmatrix` The (annotated) data matrix of shape `n_obs` × `n_vars`. Rows correspond to cells and columns to genes. min_counts : `int`, optional (default: `None`) Minimum number of counts required for a gene to pass filtering. min_cells : `int`, optional (default: `None`) Minimum number of cells expressed required for a gene to pass filtering. max_counts : `int`, optional (default: `None`) Maximum number of counts required for a gene to pass filtering. max_cells : `int`, optional (default: `None`) Maximum number of cells expressed required for a gene to pass filtering. min_counts_u : `int`, optional (default: `None`) Minimum number of unspliced counts required for a gene to pass filtering. min_cells_u : `int`, optional (default: `None`) Minimum number of unspliced cells expressed required for a gene to pass filtering. max_counts_u : `int`, optional (default: `None`) Maximum number of unspliced counts required for a gene to pass filtering. max_cells_u : `int`, optional (default: `None`) Maximum number of unspliced cells expressed required for a gene to pass filtering. min_shared_counts: `int`, optional (default: `None`) Minimum number of counts (in cells expressed simultaneously in unspliced and spliced) required for a gene. min_shared_cells: `int`, optional (default: `None`) Minimum number of cells required for a gene to be expressed simultaneously in unspliced and spliced. copy : `bool`, optional (default: `False`) Determines whether a copy is returned. Returns ------- Filters the object and adds `n_counts` to `adata.var`. """ adata = data.copy() if copy else data # set initial cell sizes before filtering set_initial_size(adata) layers = [layer for layer in ['spliced', 'unspliced'] if layer in adata.layers.keys()] if min_shared_counts is not None or min_shared_cells is not None: layers.extend(['shared']) for layer in layers: if layer is 'spliced': _min_counts, _min_cells, _max_counts, _max_cells = min_counts, min_cells, max_counts, max_cells elif layer is 'unspliced': _min_counts, _min_cells, _max_counts, _max_cells = min_counts_u, min_cells_u, max_counts_u, max_cells_u else: # shared counts/cells _min_counts, _min_cells, _max_counts, _max_cells = min_shared_counts, min_shared_cells, None, None if layer in adata.layers.keys(): X = adata.layers[layer] else: # shared counts/cells Xs, Xu = adata.layers['spliced'], adata.layers['unspliced'] nonzeros = (Xs > 0).multiply(Xu > 0) if issparse(Xs) else (Xs > 0) * (Xu > 0) X = nonzeros.multiply(Xs) + nonzeros.multiply(Xu) if issparse(nonzeros) else nonzeros * (Xs + Xu) gene_subset = np.ones(adata.n_vars, dtype=bool) if _min_counts is not None or _max_counts is not None: gene_subset, _ = filter(X, min_counts=_min_counts, max_counts=_max_counts) adata._inplace_subset_var(gene_subset) if _min_cells is not None or _max_cells is not None: gene_subset, _ = filter(X, min_cells=_min_cells, max_cells=_max_cells) adata._inplace_subset_var(gene_subset) s = np.sum(~gene_subset) if s > 0: logg.info('Filtered out {} genes that are detected'.format(s), end=' ') if _min_cells is not None or _min_counts is not None: logg.info('in less than', str(_min_cells) + ' cells (' + str(layer) + ').' if _min_counts is None else str(_min_counts) + ' counts (' + str(layer) + ').', no_indent=True) if max_cells is not None or max_counts is not None: logg.info('in more than ', str(_max_cells) + ' cells(' + str(layer) + ').' if _max_counts is None else str(_max_counts) + ' counts (' + str(layer) + ').', no_indent=True) return adata if copy else None def filter_genes_dispersion(data, flavor='seurat', min_disp=None, max_disp=None, min_mean=None, max_mean=None, n_bins=20, n_top_genes=None, log=True, copy=False): """Extract highly variable genes. The normalized dispersion is obtained by scaling with the mean and standard deviation of the dispersions for genes falling into a given bin for mean expression of genes. This means that for each bin of mean expression, highly variable genes are selected. Parameters ---------- data : :class:`~anndata.AnnData`, `np.ndarray`, `sp.sparse` The (annotated) data matrix of shape `n_obs` × `n_vars`. Rows correspond to cells and columns to genes. flavor : {'seurat', 'cell_ranger', 'svr'}, optional (default: 'seurat') Choose the flavor for computing normalized dispersion. If choosing 'seurat', this expects non-logarithmized data - the logarithm of mean and dispersion is taken internally when `log` is at its default value `True`. For 'cell_ranger', this is usually called for logarithmized data - in this case you should set `log` to `False`. In their default workflows, Seurat passes the cutoffs whereas Cell Ranger passes `n_top_genes`. min_mean=0.0125, max_mean=3, min_disp=0.5, max_disp=`None` : `float`, optional If `n_top_genes` unequals `None`, these cutoffs for the means and the normalized dispersions are ignored. n_bins : `int` (default: 20) Number of bins for binning the mean gene expression. Normalization is done with respect to each bin. If just a single gene falls into a bin, the normalized dispersion is artificially set to 1. You'll be informed about this if you set `settings.verbosity = 4`. n_top_genes : `int` or `None` (default: `None`) Number of highly-variable genes to keep. log : `bool`, optional (default: `True`) Use the logarithm of the mean to variance ratio. copy : `bool`, optional (default: `False`) If an :class:`~anndata.AnnData` is passed, determines whether a copy is returned. Returns ------- If an AnnData `adata` is passed, returns or updates `adata` depending on \ `copy`. It filters the `adata` and adds the annotations """ adata = data.copy() if copy else data set_initial_size(adata) if n_top_genes is not None and adata.n_vars < n_top_genes: logg.info('Skip filtering by dispersion since number of variables are less than `n_top_genes`') else: if flavor is 'svr': mu = adata.X.mean(0).A1 if issparse(adata.X) else adata.X.mean(0) sigma = np.sqrt(adata.X.multiply(adata.X).mean(0).A1 - mu ** 2) if issparse(adata.X) else adata.X.std(0) log_mu = np.log2(mu) log_cv = np.log2(sigma / mu) from sklearn.svm import SVR clf = SVR(gamma=150. / len(mu)) clf.fit(log_mu[:, None], log_cv) score = log_cv - clf.predict(log_mu[:, None]) nth_score = np.sort(score)[::-1][n_top_genes] adata._inplace_subset_var(score >= nth_score) else: from scanpy.api.pp import filter_genes_dispersion filter_genes_dispersion(adata, flavor=flavor, min_disp=min_disp, max_disp=max_disp, min_mean=min_mean, max_mean=max_mean, n_bins=n_bins, n_top_genes=n_top_genes, log=log) return adata if copy else None def counts_per_cell_quantile(X, max_proportion_per_cell=.05, counts_per_cell=None): if counts_per_cell is None: counts_per_cell = X.sum(1).A1 if issparse(X) else X.sum(1) gene_subset = np.all(X <= counts_per_cell[:, None] * max_proportion_per_cell, axis=0) if issparse(X): gene_subset = gene_subset.A1 return X[:, gene_subset].sum(1).A1 if issparse(X) else X[:, gene_subset].sum(1) def not_yet_normalized(X): return np.allclose((X.data[:10] if issparse(X) else X[:, 0]) % 1, 0, atol=1e-3) def normalize_per_cell(data, counts_per_cell_after=None, counts_per_cell=None, key_n_counts=None, max_proportion_per_cell=None, use_initial_size=True, layers=['spliced', 'unspliced'], enforce=False, copy=False): """Normalize each cell by total counts over all genes. Parameters ---------- data : :class:`~anndata.AnnData`, `np.ndarray`, `sp.sparse` The (annotated) data matrix of shape `n_obs` × `n_vars`. Rows correspond to cells and columns to genes. counts_per_cell_after : `float` or `None`, optional (default: `None`) If `None`, after normalization, each cell has a total count equal to the median of the *counts_per_cell* before normalization. counts_per_cell : `np.array`, optional (default: `None`) Precomputed counts per cell. key_n_counts : `str`, optional (default: `'n_counts'`) Name of the field in `adata.obs` where the total counts per cell are stored. max_proportion_per_cell : `int` (default: `None`) Exclude genes counts that account for more than a specific proportion of cell size, e.g. 0.05. use_initial_size : `bool` (default: `True`) Whether to use initial cell sizes oder actual cell sizes. layers : `str` or `list` (default: `{'spliced', 'unspliced'}`) Keys for layers to be also considered for normalization. copy : `bool`, optional (default: `False`) If an :class:`~anndata.AnnData` is passed, determines whether a copy is returned. Returns ------- Returns or updates `adata` with normalized version of the original `adata.X`, depending on `copy`. """ adata = data.copy() if copy else data layers = adata.layers.keys() if layers is 'all' else [layers] if isinstance(layers, str) \ else [layer for layer in layers if layer in adata.layers.keys()] layers = ['X'] + layers modified_layers = [] for layer in layers: X = adata.X if layer is 'X' else adata.layers[layer] if not_yet_normalized(X) or enforce: counts = counts_per_cell if counts_per_cell is not None \ else get_initial_size(adata, layer) if use_initial_size else get_size(adata, layer) if max_proportion_per_cell is not None and (0 < max_proportion_per_cell < 1): counts = counts_per_cell_quantile(X, max_proportion_per_cell, counts) # equivalent to scanpy.pp.normalize_per_cell(X, counts_per_cell_after, counts) counts_after = np.median(counts) if counts_per_cell_after is None else counts_per_cell_after counts /= counts_after + (counts_after == 0) counts += counts == 0 # to avoid division by zero if issparse(X): sparsefuncs.inplace_row_scale(X, 1 / counts) else: X /= np.array(counts[:, None]) modified_layers.append(layer) adata.obs['n_counts' if key_n_counts is None else key_n_counts] = get_size(adata) if len(modified_layers) > 0: logg.info('Normalized count data:', ', '.join(modified_layers) + '.') return adata if copy else None def log1p(data, copy=False): """Logarithmize the data matrix. Computes :math:`X = \\log(X + 1)`, where :math:`log` denotes the natural logarithm. Parameters ---------- data: :class:`~anndata.AnnData` Annotated data matrix. copy: `bool` (default: `False`) Return a copy of `adata` instead of updating it. Returns ------- Returns or updates `adata` depending on `copy`. """ adata = data.copy() if copy else data X = (adata.X.data if issparse(adata.X) else adata.X) if isinstance(adata, AnnData) else adata np.log1p(X, out=X) return adata if copy else None def filter_and_normalize(data, min_counts=None, min_counts_u=None, min_cells=None, min_cells_u=None, min_shared_counts=None, min_shared_cells=None, n_top_genes=None, flavor='seurat', log=True, copy=False): """Filtering, normalization and log transform Expects non-logarithmized data. If using logarithmized data, pass `log=False`. Runs the following steps .. code:: python scv.pp.filter_genes(adata) scv.pp.normalize_per_cell(adata) if n_top_genes is not None: scv.pp.filter_genes_dispersion(adata) if log: scv.pp.log1p(adata) Arguments --------- data: :class:`~anndata.AnnData` Annotated data matrix. min_counts: `int` (default: `None`) Minimum number of counts required for a gene to pass filtering (spliced). min_counts_u: `int` (default: `None`) Minimum number of counts required for a gene to pass filtering (unspliced). min_cells: `int` (default: `None`) Minimum number of cells expressed required for a gene to pass filtering (spliced). min_cells_u: `int` (default: `None`) Minimum number of cells expressed required for a gene to pass filtering (unspliced). min_shared_counts: `int`, optional (default: `None`) Minimum number of counts (in cells expressed simultaneously in unspliced and spliced) required for a gene. min_shared_cells: `int`, optional (default: `None`) Minimum number of cells required for a gene to be expressed simultaneously in unspliced and spliced. n_top_genes: `int` (default: `None`) Number of genes to keep. flavor: {'seurat', 'cell_ranger', 'svr'}, optional (default: 'seurat') Choose the flavor for computing normalized dispersion. If choosing 'seurat', this expects non-logarithmized data. log: `bool` (default: `True`) Take logarithm. copy: `bool` (default: `False`) Return a copy of `adata` instead of updating it. Returns ------- Returns or updates `adata` depending on `copy`. """ adata = data.copy() if copy else data if 'spliced' not in adata.layers.keys() or 'unspliced' not in adata.layers.keys(): raise ValueError('Could not find spliced / unspliced counts.') filter_genes(adata, min_counts=min_counts, min_counts_u=min_counts_u, min_cells=min_cells, min_cells_u=min_cells_u, min_shared_counts=min_shared_counts, min_shared_cells=min_shared_cells,) normalize_per_cell(adata) if n_top_genes is not None: filter_genes_dispersion(adata, n_top_genes=n_top_genes, flavor=flavor) log_advised = np.allclose(adata.X[:10].sum(), adata.layers['spliced'][:10].sum()) if log and log_advised: log1p(adata) logg.info('Logarithmized X.' if log and log_advised else 'Did not modify X as it looks preprocessed already.' if log else 'Consider logarithmizing X with `scv.pp.log1p` for better results.' if log_advised else '') return adata if copy else None def recipe_velocity(adata, min_counts=3, min_counts_u=3, n_top_genes=None, n_pcs=30, n_neighbors=30, log=True, copy=False): """Runs pp.filter_and_normalize() and pp.moments() """ from .moments import moments filter_and_normalize(adata, min_counts=min_counts, min_counts_u=min_counts_u, n_top_genes=n_top_genes, log=log) moments(adata, n_neighbors=n_neighbors, n_pcs=n_pcs) return adata if copy else None PKfvN; scvelo/tools/__init__.pyfrom .velocity import velocity, velocity_genes from .velocity_graph import velocity_graph from .transition_matrix import transition_matrix from .velocity_embedding import velocity_embedding from .velocity_confidence import velocity_confidence, velocity_confidence_transition from .terminal_states import cell_fate, cell_origin, eigs, terminal_states from .rank_velocity_genes import velocity_clusters, rank_velocity_genes from .velocity_pseudotime import velocity_map, velocity_pseudotime from scanpy.api.tl import tsne, umap, diffmap, dpt, louvain, paga from .dynamical_model import DynamicsRecovery, recover_dynamics, dynamical_velocity PK}NKMMscvelo/tools/dynamical_model.pyfrom .. import settings from .. import logging as logg from .utils import make_dense, make_unique_list from .dynamical_model_utils import BaseDynamics, unspliced, spliced, vectorize, derivatives, \ find_swichting_time, fit_alpha, fit_scaling, linreg, convolve, assign_timepoints, tau_inv import warnings import numpy as np import matplotlib.pyplot as pl from matplotlib import rcParams class DynamicsRecovery(BaseDynamics): def __init__(self, adata=None, gene=None, u=None, s=None, use_raw=False, load_pars=None, fix_scaling=None): super(DynamicsRecovery, self).__init__(adata.n_obs) _layers = adata[:, gene].layers self.use_raw = use_raw = use_raw or 'Ms' not in _layers.keys() # extract actual data if u is None or s is None: u = make_dense(_layers['unspliced']) if use_raw else make_dense(_layers['Mu']) s = make_dense(_layers['spliced']) if use_raw else make_dense(_layers['Ms']) self.s, self.u = s, u s_raw = s if use_raw else make_dense(_layers['spliced']) u_raw = u if use_raw else make_dense(_layers['unspliced']) # set weights for fitting (exclude dropouts and extreme outliers) nonzero = np.ravel(s_raw > 0) & np.ravel(u_raw > 0) s_filter = np.ravel(s < np.percentile(s[nonzero], 98)) u_filter = np.ravel(u < np.percentile(u[nonzero], 98)) self.weights = s_filter & u_filter & nonzero self.fix_scaling = fix_scaling if load_pars and 'fit_alpha' in adata.var.keys(): self.load_pars(adata, gene) else: self.initialize() def initialize(self): u, s, w = self.u, self.s, self.weights u_w, s_w, perc = u[w], s[w], 95 if self.fix_scaling is None or self.fix_scaling is False: self.scaling = u[w].max(0) / s[w].max(0) * 1.3 else: self.scaling = self.fix_scaling u, u_w = self.u / self.scaling, u_w / self.scaling # initialize beta and gamma from extreme quantiles of s if False: weights_s = s_w >= np.percentile(s_w, perc, axis=0) else: us_norm = s_w / np.clip(np.max(s_w, axis=0), 1e-3, None) + u_w / np.clip(np.max(u_w, axis=0), 1e-3, None) weights_s = us_norm >= np.percentile(us_norm, perc, axis=0) beta, gamma = 1, linreg(convolve(u_w, weights_s), convolve(s_w, weights_s)) # initialize alpha and switching points from extreme quantiles of u weights_u = u_w >= np.percentile(u_w, perc, axis=0) u_w, s_w = u_w[weights_u], s_w[weights_u] alpha, u0_, s0_ = u_w.mean(), u_w.mean(), s_w.mean() alpha_, u0, s0, = 0, 0, 0 t, tau, o = assign_timepoints(u, s, alpha, beta, gamma, u0_=u0_, s0_=s0_) # update object with initialized vars self.alpha, self.beta, self.gamma, self.alpha_ = alpha, beta, gamma, alpha_ self.u0, self.s0, self.u0_, self.s0_ = u0, s0, u0_, s0_ self.t, self.tau, self.o, self.t_ = t, tau, o, np.max(tau * o) self.pars = np.array([alpha, beta, gamma, self.t_, self.scaling])[:, None] self.loss = [self.get_loss()] self.update_state_dependent() # self.update_scaling() def load_pars(self, adata, gene): idx = np.where(adata.var_names == gene)[0][0] if isinstance(gene, str) else gene self.alpha = adata.var['fit_alpha'][idx] self.beta = adata.var['fit_beta'][idx] self.gamma = adata.var['fit_gamma'][idx] self.scaling = adata.var['fit_scaling'][idx] self.t_ = adata.var['fit_t_'][idx] self.t = adata.layers['fit_t'][:, idx] self.u0, self.s0, self.alpha_ = 0, 0, 0 self.u0_ = unspliced(self.t_, self.u0, self.alpha, self.beta) self.s0_ = spliced(self.t_, self.u0, self.s0, self.alpha, self.beta, self.gamma) self.update_state_dependent() def fit(self, max_iter=100, r=None, method=None, clip_loss=None): improved, idx_update = True, np.clip(int(max_iter / 10), 1, None) for i in range(max_iter): self.update_vars(r=r, method=method, clip_loss=clip_loss) if improved or (i % idx_update == 1) or i == max_iter - 1: improved = self.update_state_dependent() if i > 10 and (i % idx_update == 1): loss_prev, loss = np.max(self.loss[-10:]), self.loss[-1] if loss_prev - loss < loss_prev * 1e-3: improved = self.shuffle_pars() if not improved: break self.update_state_dependent() def update_state_dependent(self): u, s, w = self.u / self.scaling, self.s, self.weights u_w, s_w = (u, s) if w is None else (u[w], s[w]) alpha, beta, gamma, scaling = self.alpha, self.beta, self.gamma, self.scaling improved_tau, improved_alpha, improved_scaling = False, False, False # find optimal switching (generalized lin.reg) & assign timepoints/states (explicit) tau_w, o_w = (self.tau, self.o) if w is None else (self.tau[w], self.o[w]) t0_ = find_swichting_time(u_w, s_w, tau_w, o_w, alpha, beta, gamma) t0_vals = t0_ + np.linspace(-1, 1, num=5) * t0_ / 10 for t0_ in t0_vals: improved_tau = improved_tau or self.update_loss(t_=t0_, reassign_time=True) # update if improved # fit alpha (generalized lin.reg) tau_w, o_w = (self.tau, self.o) if w is None else (self.tau[w], self.o[w]) alpha = fit_alpha(u_w, s_w, tau_w, o_w, beta, gamma) alpha = self.alpha if alpha is None or alpha != alpha else alpha alpha_vals = alpha + np.linspace(-1, 1, num=5) * alpha / 10 for alpha in alpha_vals: improved_alpha = improved_alpha or self.update_loss(alpha=alpha, reassign_time=True) # update if improved # fit scaling (generalized lin.reg) if self.fix_scaling is None: t_w, tau_w, o_w = (self.t, self.tau, self.o) if w is None else (self.t[w], self.tau[w], self.o[w]) alpha, t0_ = self.alpha, self.t_ scaling = fit_scaling(u_w, t_w, t0_, alpha, beta) improved_scaling = self.update_loss(scaling=scaling*self.scaling, reassign_time=True) # update if improved return improved_tau or improved_alpha or improved_scaling def update_scaling(self): # fit scaling and update if improved u, s, t, tau, o = self.u / self.scaling, self.s, self.t, self.tau, self.o alpha, beta, gamma, alpha_ = self.alpha, self.beta, self.gamma, self.alpha_ u0, s0, u0_, s0_, t_ = self.u0, self.s0, self.u0_, self.s0_, 0 if self.t_ is None else self.t_ # fit alpha and scaling and update if improved alpha = fit_alpha(u, s, tau, o, beta, gamma) t0_ = find_swichting_time(u, s, tau, o, alpha, beta, gamma) t, tau, o = assign_timepoints(u, s, alpha, beta, gamma, t0_) improved_alpha = self.update_loss(t, t0_, alpha=alpha) # fit scaling and update if improved scaling = fit_scaling(u, t, t_, alpha, beta) * self.scaling t0_ = find_swichting_time(u, s, tau, o, alpha, beta, gamma) t, tau, o = assign_timepoints(u, s, alpha, beta, gamma, t0_) improved_scaling = self.update_loss(t, t0_, scaling=scaling) return improved_alpha or improved_scaling def update_vars(self, r=None, method=None, clip_loss=None): if r is None: r = 1e-2 if method is 'adam' else 1e-6 if clip_loss is None: clip_loss = False if method is 'adam' else True # if self.weights is None: # self.uniform_weighting(n_regions=5, perc=95) t, t_, alpha, beta, gamma, scaling = self.t, self.t_, self.alpha, self.beta, self.gamma, self.scaling dalpha, dbeta, dgamma, dalpha_, dtau, dt_ = derivatives(self.u, self.s, t, t_, alpha, beta, gamma, scaling) if method is 'adam': b1, b2, eps = 0.9, 0.999, 1e-8 # update 1st and 2nd order gradient moments dpars = np.array([dalpha, dbeta, dgamma]) m_dpars = b1 * self.m_dpars[:, -1] + (1 - b1) * dpars v_dpars = b2 * self.v_dpars[:, -1] + (1 - b2) * dpars**2 self.dpars = np.c_[self.dpars, dpars] self.m_dpars = np.c_[self.m_dpars, m_dpars] self.v_dpars = np.c_[self.v_dpars, v_dpars] # correct for bias t = len(self.m_dpars[0]) m_dpars /= (1 - b1 ** t) v_dpars /= (1 - b2 ** t) # Adam parameter update # Parameters are restricted to be positive n_alpha = alpha - r * m_dpars[0] / (np.sqrt(v_dpars[0]) + eps) alpha = n_alpha if n_alpha > 0 else alpha n_beta = beta - r * m_dpars[1] / (np.sqrt(v_dpars[1]) + eps) beta = n_beta if n_beta > 0 else beta n_gamma = gamma - r * m_dpars[2] / (np.sqrt(v_dpars[2]) + eps) gamma = n_gamma if n_gamma > 0 else gamma else: # Parameters are restricted to be positive n_alpha = alpha - r * dalpha alpha = n_alpha if n_alpha > 0 else alpha n_beta = beta - r * dbeta beta = n_beta if n_beta > 0 else beta n_gamma = gamma - r * dgamma gamma = n_gamma if n_gamma > 0 else gamma # tau -= r * dtau # t_ -= r * dt_ # t_ = np.max(self.tau * self.o) # t = tau * self.o + (tau + t_) * (1 - self.o) improved_vars = self.update_loss(alpha=alpha, beta=beta, gamma=gamma, clip_loss=clip_loss) def update_loss(self, t=None, t_=None, alpha=None, beta=None, gamma=None, scaling=None, reassign_time=False, clip_loss=True, report=False): vals = [t_, alpha, beta, gamma, scaling] vals_prev = [self.t_, self.alpha, self.beta, self.gamma, self.scaling] vals_name = ['t_', 'alpha', 'beta', 'gamma', 'scaling'] new_vals, new_vals_prev, new_vals_name = [], [], [] loss_prev = self.loss[-1] if len(self.loss) > 0 else 1e6 for val, val_prev, val_name in zip(vals, vals_prev, vals_name): if val is not None: new_vals.append(val) new_vals_prev.append(val_prev) new_vals_name.append(val_name) if reassign_time: t_ = self.get_optimal_switch(alpha, beta, gamma) if t_ is None else t_ t, tau, o = self.get_time_assignment(t_, alpha, beta, gamma) loss = self.get_loss(t, t_, alpha, beta, gamma, scaling) perform_update = not clip_loss or loss < loss_prev if perform_update: if len(self.loss) > 0 and loss_prev - loss > loss_prev * .01 and report: # improvement by at least 1% print('Update:', ' '.join(map(str, new_vals_name)), ' '.join(map(str, np.round(new_vals_prev, 2))), '-->', ' '.join(map(str, np.round(new_vals, 2)))) print(' loss:', np.round(loss_prev, 2), '-->', np.round(loss, 2)) if 't_' in new_vals_name or reassign_time: self.t = t self.t_ = t_ self.o = o = np.array(t <= t_, dtype=bool) self.tau = t * o + (t - t_) * (1 - o) if 'alpha' in new_vals_name: self.alpha = alpha if 'beta' in new_vals_name: self.beta = beta if 'gamma' in new_vals_name: self.gamma = gamma if 'scaling' in new_vals_name: self.scaling = scaling self.pars = np.c_[self.pars, np.array([self.alpha, self.beta, self.gamma, self.t_, self.scaling])[:, None]] self.loss.append(loss if perform_update else loss_prev) return perform_update def shuffle_pars(self, alpha_sight=[-.5, .5], gamma_sight=[-.5, .5], num=5): alpha_vals = np.linspace(alpha_sight[0], alpha_sight[1], num=num) * self.alpha + self.alpha gamma_vals = np.linspace(gamma_sight[0], gamma_sight[1], num=num) * self.gamma + self.gamma x, y = alpha_vals, gamma_vals f = lambda x, y: self.get_loss(alpha=x, gamma=y, reassign_time=True) z = np.zeros((len(x), len(x))) for i, xi in enumerate(x): for j, yi in enumerate(y): z[i, j] = f(xi, yi) ix, iy = np.unravel_index(z.argmin(), z.shape) return self.update_loss(alpha=x[ix], gamma=y[ix], reassign_time=True) def read_pars(adata, pars_names=['alpha', 'beta', 'gamma', 't_', 'scaling'], key='fit'): pars = [] for name in pars_names: pkey = key + '_' + name par = adata.var[pkey].values if pkey in adata.var.keys() else np.zeros(adata.n_vars) * np.nan pars.append(par) return pars def write_pars(adata, pars, pars_names=['alpha', 'beta', 'gamma', 't_', 'scaling'], add_key='fit'): for i, name in enumerate(pars_names): adata.var[add_key + '_' + name] = pars[i] def recover_dynamics(data, var_names='velocity_genes', max_iter=100, learning_rate=None, add_key='fit', t_max=None, use_raw=False, min_loss=True, fix_scaling=None, load_pars=None, return_model=False, plot_results=False, copy=False, **kwargs): """Estimates velocities in a gene-specific manner Arguments --------- data: :class:`~anndata.AnnData` Annotated data matrix. Returns ------- Returns or updates `adata` """ adata = data.copy() if copy else data logg.info('recovering dynamics', r=True) if isinstance(var_names, str): var_names = adata.var_names[adata.var[var_names]] if 'genes' in var_names and var_names in adata.var.keys() else adata.var_names var_names = make_unique_list(var_names, allow_array=True) var_names = [name for name in var_names if name in adata.var_names] alpha, beta, gamma, t_, scaling = read_pars(adata) idx = [] L, P, T = [], [], adata.layers['fit_t'] if 'fit_t' in adata.layers.keys() else np.zeros(adata.shape) * np.nan progress = logg.ProgressReporter(len(var_names)) for i, gene in enumerate(var_names): dm = DynamicsRecovery(adata, gene, use_raw=use_raw, load_pars=load_pars, fix_scaling=fix_scaling) if max_iter > 1: dm.fit(max_iter, learning_rate, **kwargs) ix = np.where(adata.var_names == gene)[0][0] idx.append(ix) alpha[ix], beta[ix], gamma[ix], t_[ix], scaling[ix] = dm.pars[:, np.argmin(dm.loss) if min_loss else -1] T[:, ix] = dm.t L.append(dm.loss) if plot_results and i < 4: P.append(dm.pars) progress.update() progress.finish() with warnings.catch_warnings(): warnings.simplefilter("ignore") T_max = np.nanpercentile(T, 95, axis=0) - np.nanpercentile(T, 5, axis=0) m = t_max / T_max if t_max is not None else np.ones(adata.n_vars) alpha, beta, gamma, T, t_ = alpha / m, beta / m, gamma / m, T * m, t_ * m write_pars(adata, [alpha, beta, gamma, t_, scaling]) adata.layers['fit_t'] = T cur_len = adata.varm['loss'].shape[1] if 'loss' in adata.varm.keys() else 2 max_len = max(np.max([len(l) for l in L]), cur_len) loss = np.ones((adata.n_vars, max_len)) * np.nan if 'loss' in adata.varm.keys(): loss[:, :cur_len] = adata.varm['loss'] loss[idx] = np.vstack([np.concatenate([l, np.ones(max_len-len(l)) * np.nan]) for l in L]) adata.varm['loss'] = loss logg.info(' finished', time=True, end=' ' if settings.verbosity > 2 else '\n') logg.hint('added \n' ' \'' + add_key + '_pars' + '\', fitted parameters for splicing dynamics (adata.var)') if plot_results: # Plot Parameter Stats figsize = [12, 5] # rcParams['figure.figsize'] fontsize = rcParams['font.size'] fig, axes = pl.subplots(nrows=len(var_names[:4]), ncols=6, figsize=figsize) pl.subplots_adjust(wspace=0.7, hspace=0.5) for i, gene in enumerate(var_names[:4]): P[i] *= np.array([1 / m[idx[i]], 1 / m[idx[i]], 1 / m[idx[i]], m[idx[i]], 1])[:, None] for j, pij in enumerate(P[i]): axes[i][j].plot(pij) axes[i][len(P[i])].plot(L[i]) if i == 0: for j, name in enumerate(['alpha', 'beta', 'gamma', 't_', 'scaling', 'loss']): axes[i][j].set_title(name, fontsize=fontsize) return dm if return_model else adata if copy else None def dynamical_velocity(data, vkey='dynamical_velocity', mode='soft', perc_ss=None, use_raw=False, copy=False): adata = data.copy() if copy else data if 'fit_alpha' not in adata.var.keys(): raise ValueError('Run tl.recover_dynamics first.') logg.info('computing dynamical velocities', r=True) alpha = adata.var['fit_alpha'].values beta = adata.var['fit_beta'].values gamma = adata.var['fit_gamma'].values z = adata.var['fit_scaling'].values t = adata.layers['fit_t'] t_ = adata.var['fit_t_'].values if use_raw or 'Ms' not in adata.layers.keys(): u = adata.layers['unspliced'] / z s = adata.layers['spliced'] else: u = adata.layers['Mu'] / z s = adata.layers['Ms'] if mode is 'soft': u0 = unspliced(t_, 0, alpha, beta) s0 = spliced(t_, 0, 0, alpha, beta, gamma) tau = tau_inv(u, s, 0, 0, alpha, beta, gamma) tau = np.clip(tau, 0, t_) tau_ = tau_inv(u, s, u0, s0, 0, beta, gamma) tau_ = np.clip(tau_, 0, np.max(tau_[s > 0], axis=0)) ut = unspliced(tau, 0, alpha, beta) ut_ = unspliced(tau_, u0, 0, beta) st = spliced(tau, 0, 0, alpha, beta, gamma) st_ = spliced(tau_, u0, s0, 0, beta, gamma) ut_ss = alpha / beta st_ss = alpha / gamma var_ut = np.var(u - ut, 0) var_ut_ = np.var(u - ut_, 0) var_st = np.var(s - st, 0) var_st_ = np.var(s - st_, 0) if perc_ss is None: var_ut_ss = np.var(u, 0) var_st_ss = np.var(s, 0) var_ut_ss_ = np.var(u - ut_ss, 0) var_st_ss_ = np.var(s - st_ss, 0) else: from .optimization import get_weight w = ~ get_weight(u, s, perc_ss, 'l2') w_ = ~ get_weight(ut_ss - u, st_ss - s, perc_ss, 'l2') w = w / w w_ = w_ / w_ var_ut_ss = np.nanvar(w * u, 0) var_st_ss = np.nanvar(w * s, 0) var_ut_ss_ = np.nanvar(w_ * (u - ut_ss), 0) var_st_ss_ = np.nanvar(w_ * (s - st_ss), 0) expx = np.exp(- .5 * ((u - ut) ** 2 / var_ut + (s - st) ** 2 / var_st)) expx_ = np.exp(- .5 * ((u - ut_) ** 2 / var_ut_ + (s - st_) ** 2 / var_st_)) expx_ss = np.exp(- .5 * (u ** 2 / var_ut_ss + s ** 2 / var_st_ss)) expx_ss_ = np.exp(- .5 * ((u - ut_ss) ** 2 / var_ut_ss_ + (s - st_ss) ** 2 / var_st_ss_)) div = 1 / (2 * np.pi) l = div / np.sqrt(var_ut * var_st) * expx l_ = div / np.sqrt(var_ut_ * var_st_) * expx_ l_ss = div / np.sqrt(var_ut_ss * var_st_ss) * expx_ss l_ss_ = div / np.sqrt(var_ut_ss_ * var_st_ss_) * expx_ss_ l_sum = l + l_ + l_ss + l_ss_ o = l / l_sum o_ = l_ / l_sum u = ut * o + ut_ * o_ s = st * o + st_ * o_ alpha = alpha * o elif mode is 'hard': o = np.array(t <= t_, dtype=int) tau, alpha, u0, s0 = vectorize(t, t_, alpha, beta, gamma,) u = unspliced(tau, u0, alpha, beta) s = spliced(tau, s0, u0, alpha, beta, gamma) adata.layers[vkey] = u * beta - s * gamma adata.layers[vkey + '_u'] = alpha - beta * u logg.info(' finished', time=True, end=' ' if settings.verbosity > 2 else '\n') logg.hint('added \n' ' \'' + vkey + '\', velocity vectors for each individual cell (adata.layers)') return adata if copy else NonePK VNd@uu%scvelo/tools/dynamical_model_utils.pyfrom .utils import make_dense import matplotlib.pyplot as pl from matplotlib import rcParams from mpl_toolkits.axes_grid1.inset_locator import inset_axes from scipy.sparse import issparse import warnings import numpy as np exp = np.exp """Helper functions""" def log(x, eps=1e-6): # to avoid invalid values for log. return np.log(np.clip(x, eps, 1 - eps)) def inv(x): with warnings.catch_warnings(): warnings.simplefilter("ignore") x_inv = 1 / x * (x != 0) return x_inv def convolve(x, weights=None): return (weights.multiply(x).tocsr() if issparse(weights) else weights * x) if weights is not None else x def apply_weight(arrays, w=None): return arrays if w is None else [array[w] for array in arrays] def linreg(u, s): # linear regression fit ss_ = s.multiply(s).sum(0) if issparse(s) else (s ** 2).sum(0) us_ = s.multiply(u).sum(0) if issparse(s) else (s * u).sum(0) return us_ / ss_ """Dynamics delineation""" def unspliced(tau, u0, alpha, beta): expu = exp(-beta * tau) return u0 * expu + alpha / beta * (1 - expu) def spliced(tau, s0, u0, alpha, beta, gamma): c = (alpha - u0 * beta) * inv(gamma - beta) expu, exps = exp(-beta * tau), exp(-gamma * tau) return s0 * exps + alpha / gamma * (1 - exps) + c * (exps - expu) def unspliced_spliced(tau, s0, u0, alpha, beta, gamma): c = (alpha - u0 * beta) * inv(gamma - beta) expu, exps = exp(-beta * tau), exp(-gamma * tau) u = u0 * expu + alpha / beta * (1 - expu) s = s0 * exps + alpha / gamma * (1 - exps) + c * (exps - expu) return u, s def tau_u(u, u0, alpha, beta): u_ratio = (u - alpha / beta) / (u0 - alpha / beta) return - 1 / beta * log(u_ratio) def tau_inv(u, s=None, u0=None, s0=None, alpha=None, beta=None, gamma=None): if s is None: c0 = u0 - alpha / beta cu = u - alpha / beta tau = - 1 / beta * log(cu / c0) else: beta_ = beta * inv(gamma - beta) ceta_ = alpha / gamma - beta_ * (alpha / beta) c0 = s0 - beta_ * u0 - ceta_ cs = s - beta_ * u - ceta_ tau = - 1 / gamma * log(cs / c0) return tau def find_swichting_time(u, s, tau, o, alpha, beta, gamma): off, on = o == 0, o == 1 if off.sum() > 0: u_, s_, tau_ = u[off], s[off], tau[off] beta_ = beta * inv(gamma - beta) ceta_ = alpha / gamma - beta_ * alpha / beta x = - ceta_ * exp(-gamma * tau_) y = s_ - beta_ * u_ exp_t0_ = (y * x).sum() / (x ** 2).sum() t0_ = -1 / gamma * log(exp_t0_ + 1) if -1 < exp_t0_ < 0 else np.max(tau[on]) if on.sum() > 0 else np.max(tau) else: t0_ = np.max(tau) return t0_ def compute_state_likelihoods(u, s, alpha, beta, gamma, t0_=None, u0_=None, s0_=None, adjust_variance=False, normalized=True, mode='hard'): def nanvar(x, axis=0): return np.nanvar(x, axis) if np.isnan(x).any() else np.var(x, axis) if t0_ is None: t0_ = tau_inv(u0_, s0_, 0, 0, alpha, beta, gamma) if u0_ is None or s0_ is None: u0_, s0_ = (unspliced(t0_, 0, alpha, beta), spliced(t0_, 0, 0, alpha, beta, gamma)) # compute inverse timepoints if mode is 'projection': t0 = tau_u(np.min(u[s > 0]), u0_, 0, beta) tpoints = np.linspace(0, t0_, num=200) tpoints_ = np.linspace(0, t0, num=200)[1:] x_obs = np.vstack([u, s]).T xt = np.vstack([unspliced(tpoints, 0, alpha, beta), spliced(tpoints, 0, 0, alpha, beta, gamma)]).T xt_ = np.vstack([unspliced(tpoints_, u0_, 0, beta), spliced(tpoints_, s0_, u0_, 0, beta, gamma)]).T # assign time points (oth. projection onto 'on' and 'off' curve) tau, tau_ = np.zeros(len(u)), np.zeros(len(u)) for i, xi in enumerate(x_obs): diffx, diffx_ = ((xt - xi)**2).sum(1), ((xt_ - xi)**2).sum(1) tau[i] = tpoints[np.argmin(diffx)] tau_[i] = tpoints_[np.argmin(diffx_)] else: tau = tau_inv(u, s, 0, 0, alpha, beta, gamma) tau = np.clip(tau, 0, t0_) tau_ = tau_inv(u, s, u0_, s0_, 0, beta, gamma) tau_ = np.clip(tau_, 0, np.max(tau_[s > 0])) # compute distances from states (induction state, repression state, steady state) distu, distu_, = u - unspliced(tau, 0, alpha, beta), u - unspliced(tau_, u0_, 0, beta) dists, dists_, = s - spliced(tau, 0, 0, alpha, beta, gamma), s - spliced(tau_, u0_, s0_, 0, beta, gamma) distu_steady, distu_steady_ = u - alpha / beta, u dists_steady, dists_steady_ = s - alpha / gamma, s # compute variances of distances varu, varu_ = nanvar(distu), nanvar(distu_) vars, vars_ = nanvar(dists), nanvar(dists_) varu_steady, varu_steady_ = nanvar(distu_steady), nanvar(distu_steady_) vars_steady, vars_steady_ = nanvar(dists_steady), nanvar(dists_steady_) # compute variance weighted distances distx = distu ** 2 / varu + dists ** 2 / vars distx_ = distu_ ** 2 / varu_ + dists_ ** 2 / vars_ distx_steady = distu_steady ** 2 / varu_steady + dists_steady ** 2 / vars_steady distx_steady_ = distu_steady_ ** 2 / varu_steady_ + dists_steady_ ** 2 / vars_steady_ if adjust_variance: # recompute variance weighted distances id_state = np.argmin([distx_, distx, distx_steady_, distx_steady], axis=0) on, off, steady, steady_ = (id_state == 1), (id_state == 0), (id_state == 3), (id_state == 2) on, off, steady, steady_ = on / on, off / off, steady / steady, steady_ / steady_ varu, varu_ = np.nanvar(distu * on, 0), np.nanvar(distu_ * off, 0) vars, vars_ = np.nanvar(dists * on, 0), np.nanvar(dists_ * off, 0) varu_steady, varu_steady_ = np.nanvar(distu_steady * steady, 0), np.nanvar(distu_steady_ * steady_, 0) vars_steady, vars_steady_ = np.nanvar(dists_steady * steady, 0), np.nanvar(dists_steady_ * steady_, 0) distx = distu ** 2 / varu + dists ** 2 / vars distx_ = distu_ ** 2 / varu_ + dists_ ** 2 / vars_ distx_steady = distu_steady ** 2 / varu_steady + dists_steady ** 2 / vars_steady distx_steady_ = distu_steady_ ** 2 / varu_steady_ + dists_steady_ ** 2 / vars_steady_ if mode is 'soft': div = 1 / (2 * np.pi) varx = np.sqrt(varu * vars) varx_ = np.sqrt(varu_ * vars_) varx_steady = np.sqrt(varu_steady * vars_steady) varx_steady_ = np.sqrt(varu_steady_ * vars_steady_) l = div / varx * np.exp(-.5 * distx) l_ = div / varx_ * np.exp(-.5 * distx_) l_steady = div / varx_steady * np.exp(-.5 * distx_steady) l_steady_ = div / varx_steady_ * np.exp(-.5 * distx_steady_) if normalized: l, l_, l_steady, l_steady_ = np.array([l, l_, l_steady, l_steady_]) / (l + l_ + l_steady + l_steady_) return l, l_, l_steady, l_steady_ else: o = 1 if mode is 'on' else 0 if mode is 'off' else np.argmin([distx_, distx, distx_steady_, distx_steady], axis=0) tau = tau * (o == 1) + tau_ * (1 - o) t = tau * o + (tau_ + t0_) * (1 - o) return t, tau, o def assign_timepoints(u, s, alpha, beta, gamma, t0_=None, u0_=None, s0_=None, mode='hard'): if t0_ is None: t0_ = tau_inv(u0_, s0_, 0, 0, alpha, beta, gamma) if u0_ is None or s0_ is None: u0_, s0_ = (unspliced(t0_, 0, alpha, beta), spliced(t0_, 0, 0, alpha, beta, gamma)) x_obs = np.vstack([u, s]).T if mode is 'projection': t0 = tau_u(np.min(u[s > 0]), u0_, 0, beta) tpoints = np.linspace(0, t0_, num=200) tpoints_ = np.linspace(0, t0, num=200)[1:] xt = np.vstack([unspliced(tpoints, 0, alpha, beta), spliced(tpoints, 0, 0, alpha, beta, gamma)]).T xt_ = np.vstack([unspliced(tpoints_, u0_, 0, beta), spliced(tpoints_, s0_, u0_, 0, beta, gamma)]).T # assign time points (oth. projection onto 'on' and 'off' curve) tau, tau_ = np.zeros(len(u)), np.zeros(len(u)) for i, xi in enumerate(x_obs): diffx, diffx_ = ((xt - xi)**2).sum(1), ((xt_ - xi)**2).sum(1) tau[i] = tpoints[np.argmin(diffx)] tau_[i] = tpoints_[np.argmin(diffx_)] else: tau = tau_inv(u, s, 0, 0, alpha, beta, gamma) tau = np.clip(tau, 0, t0_) tau_ = tau_inv(u, s, u0_, s0_, 0, beta, gamma) tau_ = np.clip(tau_, 0, np.max(tau_[s > 0])) # l, l_, l_steady, l_steady_ = compute_state_likelihoods(u, s, alpha, beta, gamma, t0_, u0_, s0_) xt = np.vstack([unspliced(tau, 0, alpha, beta), spliced(tau, 0, 0, alpha, beta, gamma)]).T xt_ = np.vstack([unspliced(tau_, u0_, 0, beta), spliced(tau_, s0_, u0_, 0, beta, gamma)]).T diffx = ((xt - x_obs)**2).sum(1) diffx_ = ((xt_ - x_obs)**2).sum(1) o = 1 if mode is 'on' else 0 if mode is 'off' else np.argmax([diffx_, diffx], axis=0) tau = tau * o + tau_ * (1 - o) t = tau * o + (tau_ + t0_) * (1 - o) if mode is 'soft': var = np.var(s) l = np.exp(- .5 * diffx / var) + .1 l_ = np.exp(- .5 * diffx_ / var) + .1 o = l / (l + l_) return t, tau, o def fit_alpha(u, s, tau, o, beta, gamma, fit_scaling=False): off, on = o == 0, o == 1 # 'on' state expu, exps = exp(-beta * tau[on]), exp(-gamma * tau[on]) # 'off' state t0_ = np.max(tau * o) expu_, exps_ = exp(-beta * tau[off]), exp(-gamma * tau[off]) expu0_, exps0_ = exp(-beta * t0_), exp(-gamma * t0_) # from unspliced dynamics c_beta = 1 / beta * (1 - expu) c_beta_ = 1 / beta * (1 - expu0_) * expu_ # from spliced dynamics c_gamma = (1 - exps) / gamma + (exps - expu) * inv(gamma - beta) c_gamma_ = ((1 - exps0_) / gamma + (exps0_ - expu0_) * inv(gamma - beta)) * exps_ - (1 - expu0_) * (exps_ - expu_) * inv(gamma - beta) # concatenating together c = np.concatenate([c_beta, c_gamma, c_beta_, c_gamma_]).T x = np.concatenate([u[on], s[on], u[off], s[off]]).T alpha = (c * x).sum() / (c ** 2).sum() if (on.sum() > 0 and off.sum() > 0) else None if fit_scaling: # alternatively compute alpha and scaling simultaneously c = np.concatenate([c_gamma, c_gamma_]).T x = np.concatenate([s[on], s[off]]).T alpha = (c * x).sum() / (c ** 2).sum() c = np.concatenate([c_beta, c_beta_]).T x = np.concatenate([u[on], u[off]]).T scaling = (c * x).sum() / (c ** 2).sum() / alpha # ~ alpha * z / alpha return alpha, scaling else: return alpha def fit_scaling(u, t, t_, alpha, beta): tau, alpha, u0, _ = vectorize(t, t_, alpha, beta) ut = unspliced(tau, u0, alpha, beta) return (u * ut).sum() / (ut ** 2).sum() def vectorize(t, t_, alpha, beta, gamma=None, alpha_=0, u0=0, s0=0): o = np.array(t <= t_, dtype=int) tau = t * o + (t - t_) * (1 - o) u0_ = unspliced(t_, u0, alpha, beta) s0_ = spliced(t_, s0, u0, alpha, beta, gamma if gamma is not None else beta / 2) # vectorize u0, s0 and alpha u0 = u0 * o + u0_ * (1 - o) s0 = s0 * o + s0_ * (1 - o) alpha = alpha * o + alpha_ * (1 - o) return tau, alpha, u0, s0 """State-independent derivatives""" def dtau(u, s, alpha, beta, gamma, u0, s0, du0=[0, 0, 0], ds0=[0, 0, 0, 0]): a, b, g, gb, b0 = alpha, beta, gamma, gamma - beta, beta * inv(gamma - beta) cu = s - a/g - b0 * (u - a/b) c0 = s0 - a/g - b0 * (u0 - a/b) cu += cu == 0 c0 += c0 == 0 cu_, c0_ = 1 / cu, 1 / c0 dtau_a = b0/g * (c0_ - cu_) + 1/g * c0_ * (ds0[0] - b0 * du0[0]) dtau_b = 1/gb**2 * ((u - a/g) * cu_ - (u0 - a/g) * c0_) dtau_c = - a/g * (1/g**2 - 1/gb**2) * (cu_ - c0_) - b0/g/gb * (u*cu_ - u0*c0_) # + 1/g**2 * np.log(cu/c0) return dtau_a, dtau_b, dtau_c def du(tau, alpha, beta, u0=0, du0=[0, 0, 0], dtau=[0, 0, 0]): # du0 is the derivative du0 / d(alpha, beta, tau) expu, cb = exp(-beta * tau), alpha / beta du_a = du0[0] * expu + 1. / beta * (1 - expu) + (alpha - beta * u0) * dtau[0] * expu du_b = du0[1] * expu - cb / beta * (1 - expu) + (cb - u0) * tau * expu + (alpha - beta * u0) * dtau[1] * expu return du_a, du_b def ds(tau, alpha, beta, gamma, u0=0, s0=0, du0=[0, 0, 0], ds0=[0, 0, 0, 0], dtau=[0, 0, 0]): # ds0 is the derivative ds0 / d(alpha, beta, gamma, tau) expu, exps, = exp(-beta * tau), exp(-gamma * tau) expus = exps - expu cbu = (alpha - beta * u0) * inv(gamma - beta) ccu = (alpha - gamma * u0) * inv(gamma - beta) ccs = alpha / gamma - s0 - cbu ds_a = ds0[0] * exps + 1. / gamma * (1 - exps) + 1 * inv(gamma - beta) * (1 - beta * du0[0]) * expus + (ccs * gamma * exps + cbu * beta * expu) * dtau[0] ds_b = ds0[1] * exps + cbu * tau * expu + 1 * inv(gamma - beta) * (ccu - beta * du0[1]) * expus + (ccs * gamma * exps + cbu * beta * expu) * dtau[1] ds_c = ds0[2] * exps + ccs * tau * exps - alpha / gamma**2 * (1 - exps) - cbu * inv(gamma - beta) * expus + (ccs * gamma * exps + cbu * beta * expu) * dtau[2] return ds_a, ds_b, ds_c def derivatives(u, s, t, t0_, alpha, beta, gamma, scaling=1, alpha_=0, u0=0, s0=0, weights=None): o = np.array(t <= t0_, dtype=int) du0 = np.array(du(t0_, alpha, beta, u0))[:, None] * (1 - o)[None, :] ds0 = np.array(ds(t0_, alpha, beta, gamma, u0, s0))[:, None] * (1 - o)[None, :] tau, alpha, u0, s0 = vectorize(t, t0_, alpha, beta, gamma, alpha_, u0, s0) dt = np.array(dtau(u, s, alpha, beta, gamma, u0, s0, du0, ds0)) # state-dependent derivatives: du_a, du_b = du(tau, alpha, beta, u0, du0, dt) du_a, du_b = du_a * scaling, du_b * scaling ds_a, ds_b, ds_c = ds(tau, alpha, beta, gamma, u0, s0, du0, ds0, dt) # evaluate derivative of likelihood: udiff = np.array(unspliced(tau, u0, alpha, beta) * scaling - u) sdiff = np.array(spliced(tau, s0, u0, alpha, beta, gamma) - s) if weights is not None: udiff = np.multiply(udiff, weights) sdiff = np.multiply(sdiff, weights) dl_a = (du_a * (1 - o)).dot(udiff) + (ds_a * (1 - o)).dot(sdiff) dl_a_ = (du_a * o).dot(udiff) + (ds_a * o).dot(sdiff) dl_b = du_b.dot(udiff) + ds_b.dot(sdiff) dl_c = ds_c.dot(sdiff) dl_tau, dl_t0_ = None, None return dl_a, dl_b, dl_c, dl_a_, dl_tau, dl_t0_ """Base Class for Dynamics Recovery""" class BaseDynamics: def __init__(self, n_obs): self.s, self.u, self.use_raw = None, None, None zeros, zeros3 = np.zeros(n_obs), np.zeros((3, 1)) self.u0, self.s0, self.u0_, self.s0_, self.t_, self.scaling = None, None, None, None, None, None self.t, self.tau, self.o, self.weights = zeros, zeros, zeros, zeros self.alpha, self.beta, self.gamma, self.alpha_, self.pars = None, None, None, None, None self.dpars, self.m_dpars, self.v_dpars, self.loss = zeros3, zeros3, zeros3, [] def get_vals(self, t=None, t_=None, alpha=None, beta=None, gamma=None, scaling=None, reassign_time=False): alpha = self.alpha if alpha is None else alpha beta = self.beta if beta is None else beta gamma = self.gamma if gamma is None else gamma scaling = self.scaling if scaling is None else scaling t = self.t if t is None else t u, s, tau, o, w = self.u / scaling, self.s, self.tau, self.o, self.weights if reassign_time: u_w, s_w, tau_w, o_w = (u, s, tau, o) if w is None else (u[w], s[w], tau[w], o[w]) t_ = find_swichting_time(u_w, s_w, tau_w, o_w, alpha, beta, gamma) if t_ is None else t_ t, tau, o = assign_timepoints(u, s, alpha, beta, gamma, t_) else: t_ = self.t_ if t_ is None else t_ return t, t_, alpha, beta, gamma, scaling def get_ut(self, t=None, t_=None, alpha=None, beta=None, gamma=None, scaling=None, reassign_time=False, mode=None): t, t_, alpha, beta, gamma, scaling = self.get_vals(t, t_, alpha, beta, gamma, scaling, reassign_time) tau, alpha, u0, s0 = vectorize(t, t_, alpha, beta, gamma) if mode is None \ else (self.get_time_assignment(mode='on')[1], self.alpha, self.u0, self.s0) if mode is 'on' \ else (self.get_time_assignment(mode='off')[1], self.alpha_, self.u0_, self.s0_) return unspliced(tau, u0, alpha, beta) def get_st(self, t=None, t_=None, alpha=None, beta=None, gamma=None, scaling=None, reassign_time=False, mode=None): t, t_, alpha, beta, gamma, scaling = self.get_vals(t, t_, alpha, beta, gamma, scaling, reassign_time) tau, alpha, u0, s0 = vectorize(t, t_, alpha, beta, gamma) if mode is None \ else (self.get_time_assignment(mode='on')[1], self.alpha, self.u0, self.s0) if mode is 'on' \ else (self.get_time_assignment(mode='off')[1], self.alpha_, self.u0_, self.s0_) return spliced(tau, s0, u0, alpha, beta, gamma) def get_vt(self, mode='soft'): t, t_, alpha, beta, gamma, scaling = self.get_vals() _, _, o = self.get_time_assignment(mode='soft') u, s, u_, s_ = self.get_ut(mode='on'), self.get_st(mode='on'), self.get_ut(mode='off'), self.get_st(mode='off') return (beta * u - gamma * s) * o + (beta * u_ - gamma * s_) * (1 - o) def get_optimal_switch(self, alpha=None, beta=None, gamma=None): u, s, tau, o, w = self.u / self.scaling, self.s, self.tau, self.o, self.weights if w is not None: u, s, tau, o = (u[w], s[w], tau[w], o[w]) return find_swichting_time(u, s, tau, o, self.alpha if alpha is None else alpha, self.beta if beta is None else beta, self.gamma if gamma is None else gamma) def get_optimal_alpha(self): u, s, tau, o, w = self.u / self.scaling, self.s, self.tau, self.o, self.weights if w is not None: u, s, tau, o = (u[w], s[w], tau[w], o[w]) return fit_alpha(u, s, tau, o, self.beta, self.gamma) def get_optimal_scaling(self): u, t, w = self.u / self.scaling, self.t, self.weights if w is not None: u, t = (u[w], t[w]) return fit_scaling(u, t, self.t_, self.alpha, self.beta) def get_time_assignment(self, t_=None, alpha=None, beta=None, gamma=None, mode='hard'): t, tau, o = assign_timepoints(self.u / self.scaling, self.s, self.alpha if alpha is None else alpha, self.beta if beta is None else beta, self.gamma if gamma is None else gamma, self.get_optimal_switch(alpha, beta, gamma) if t_ is None else t_, mode=mode) return t, tau, o def get_loss(self, t=None, t_=None, alpha=None, beta=None, gamma=None, scaling=None, reassign_time=False): alpha = self.alpha if alpha is None else alpha beta = self.beta if beta is None else beta gamma = self.gamma if gamma is None else gamma scaling = self.scaling if scaling is None else scaling t = self.t if t is None else t u, s, tau, o, w = self.u, self.s, self.tau, self.o, self.weights if w is not None: u, s, t, tau, o = u[w], s[w], t[w], tau[w], o[w] if reassign_time: t_ = find_swichting_time(u / scaling, s, tau, o, alpha, beta, gamma) if t_ is None else t_ t, tau, o = assign_timepoints(u / scaling, s, alpha, beta, gamma, t_) else: t_ = self.t_ if t_ is None else t_ tau, alpha, u0, s0 = vectorize(t, t_, alpha, beta, gamma) udiff = np.array(unspliced(tau, u0, alpha, beta) * scaling - u) sdiff = np.array(spliced(tau, s0, u0, alpha, beta, gamma) - s) loss = np.sum(udiff ** 2 + sdiff ** 2) / len(udiff) return loss def get_likelihood(self): u, s, t, t_ = self.u, self.s, self.t, self.t_ alpha, beta, gamma, scaling = self.alpha, self.beta, self.gamma, self.scaling tau, alpha, u0, s0 = vectorize(t, t_, alpha, beta, gamma) std_u, std_s, = np.std(u), np.std(s) corr = np.corrcoef(u, s)[0, 1] udiff = np.array(unspliced(tau, u0, alpha, beta) * scaling - u) / std_u sdiff = np.array(spliced(tau, s0, u0, alpha, beta, gamma) - s) / std_s denom = 2 * np.pi * std_u * std_s * np.sqrt(1 - corr ** 2) nom = -.5 / (1 - corr ** 2) * (np.sum(udiff ** 2) + np.sum(sdiff ** 2) - 2 * corr * np.sum(udiff * sdiff)) / ( 2 * len(udiff)) likelihood = 1 / np.sqrt(denom) * np.exp(nom) return likelihood def uniform_weighting(self, n_regions=5, perc=95): # deprecated from numpy import union1d as union from numpy import intersect1d as intersect u, s = self.u, self.s u_b = np.linspace(0, np.percentile(u, perc), n_regions) s_b = np.linspace(0, np.percentile(s, perc), n_regions) regions, weights = {}, np.ones(len(u)) for i in range(n_regions): if i == 0: region = intersect(np.where(u < u_b[i + 1]), np.where(s < s_b[i + 1])) elif i < n_regions - 1: lower_cut = union(np.where(u > u_b[i]), np.where(s > s_b[i])) upper_cut = intersect(np.where(u < u_b[i + 1]), np.where(s < s_b[i + 1])) region = intersect(lower_cut, upper_cut) else: region = union(np.where(u > u_b[i]), np.where(s > s_b[i])) # lower_cut for last region regions[i] = region if len(region) > 0: weights[region] = n_regions / len(region) # set weights accordingly such that each region has an equal overall contribution. self.weights = weights * len(u) / np.sum(weights) self.u_b, self.s_b = u_b, s_b def plot_phase(self, adata=None, t=None, t_=None, alpha=None, beta=None, gamma=None, scaling=None, reassign_time=False, optimal=False, mode='soft', **kwargs): from ..plotting.scatter import scatter from ..plotting.simulation import show_full_dynamics multi = 0 for param in [alpha, beta, gamma, t_]: if param is not None and not np.isscalar(param): multi += 1 if multi == 0: t, t_, alpha, beta, gamma, scaling = self.get_vals(t, t_, alpha, beta, gamma, scaling, reassign_time) u, s = self.u, self.s ut = self.get_ut(t, t_, alpha, beta, gamma) * scaling st = self.get_st(t, t_, alpha, beta, gamma) idx_sorted = np.argsort(t) ut, st, t = ut[idx_sorted], st[idx_sorted], t[idx_sorted] args = {"color": self.get_time_assignment(mode=mode)[2], "color_map": 'RdYlGn', "vmin": -.1, "vmax": 1.1} args.update(kwargs) ax = scatter(adata, x=s, y=u, colorbar=False, show=False, **kwargs) linestyle = '-' if not optimal else '--' pl.plot(st, ut, color='purple', linestyle=linestyle) pl.xlabel('s') pl.ylabel('u') if False: # colorbar ax = pl.gca() cax = inset_axes(ax, width="2%", height="30%", loc=4, borderpad=0) import matplotlib as mpl cmap = mpl.colors.LinearSegmentedColormap.from_list("", ["blue", "purple", "red"]) cb1 = mpl.colorbar.ColorbarBase(cax, cmap=cmap, orientation='vertical') cb1.set_ticks([0.1, 0.9]) cb1.ax.set_yticklabels(['-', '+']) pl.sca(ax) elif multi == 1: a, b, g = alpha, beta, gamma if alpha is not None and not np.isscalar(alpha): n = len(alpha) loss = [] for i in range(n): self.plot_phase(adata, t, t_, alpha[i], b, g, scaling=scaling, reassign_time=reassign_time, **kwargs) loss.append(self.get_loss(t, t_, alpha[i], b, g, scaling=scaling, reassign_time=reassign_time)) opt = np.argmin(loss) self.plot_phase(adata, t, t_, alpha[opt], b, g, scaling=scaling, reassign_time=reassign_time, optimal=True, **kwargs) elif beta is not None and not np.isscalar(beta): n = len(beta) loss = [] for i in range(n): self.plot_phase(t, t_, a, beta[i], g, scaling=scaling, reassign_time=reassign_time, color=[(i+1)/n, 0, 1-(i+1)/n]) loss.append(self.get_loss(t, t_, a, beta[i], g, scaling=scaling, reassign_time=reassign_time)) opt = np.argmin(loss) self.plot_phase(adata, t, t_, a, beta[opt], g, scaling=scaling, reassign_time=reassign_time, color=[0, 1, 0], optimal=True) elif gamma is not None and not np.isscalar(gamma): n = len(gamma) loss = [] for i in range(n): self.plot_phase(adata, t, t_, a, b, gamma[i], scaling=scaling, reassign_time=reassign_time, color=[(i+1)/n, 0, 1-(i+1)/n]) loss.append(self.get_loss(t, t_, a, b, gamma[i], scaling=scaling, reassign_time=reassign_time)) opt = np.argmin(loss) self.plot_phase(t, t_, a, b, gamma[opt], scaling=scaling, reassign_time=reassign_time, color=[0, 1, 0], optimal=True) elif t_ is not None and not np.isscalar(t_): n = len(t_) loss = [] for i in range(n): self.plot_phase(adata, t, t_[i], a, b, g, scaling=scaling, reassign_time=reassign_time, **kwargs) loss.append(self.get_loss(t, t_[i], a, b, g, scaling=scaling, reassign_time=reassign_time)) opt = np.argmin(loss) self.plot_phase(adata, t, t_[opt], a, b, g, scaling=scaling, reassign_time=reassign_time, optimal=True, **kwargs) elif multi == 2: print('Too many varying Values. Only one varying parameter allowed.') def plot_regions(self): u, s, ut, st = self.u, self.s, self.ut, self.st u_b, s_b = self.u_b, self.s_b pl.figure(dpi=100) pl.scatter(s, u, color='grey') pl.xlim(0); pl.ylim(0); pl.xlabel('spliced'); pl.ylabel('unspliced') for i in range(len(s_b)): pl.plot([s_b[i], s_b[i], 0], [0, u_b[i], u_b[i]]) def plot_derivatives(self): u, s = self.u, self.s alpha, beta, gamma = self.alpha, self.beta, self.gamma t, tau, o, t_ = self.t, self.tau, self.o, self.t_ du0 = np.array(du(t_, alpha, beta))[:, None] * (1 - o)[None, :] ds0 = np.array(ds(t_, alpha, beta, gamma))[:, None] * (1 - o)[None, :] tau, alpha, u0, s0 = vectorize(t, t_, alpha, beta, gamma) dt = np.array(dtau(u, s, alpha, beta, gamma, u0, s0)) du_a, du_b = du(tau, alpha, beta, u0=u0, du0=du0, dtau=dt) ds_a, ds_b, ds_c = ds(tau, alpha, beta, gamma, u0=u0, s0=s0, du0=du0, ds0=ds0, dtau=dt) idx = np.argsort(t) t = np.sort(t) pl.plot(t, du_a[idx], label=r'$\partial u / \partial\alpha$') pl.plot(t, .2 * du_b[idx], label=r'$\partial u / \partial \beta$') pl.plot(t, ds_a[idx], label=r'$\partial s / \partial \alpha$') pl.plot(t, ds_b[idx], label=r'$\partial s / \partial \beta$') pl.plot(t, .2 * ds_c[idx], label=r'$\partial s / \partial \gamma$') pl.legend() pl.xlabel('t') def plot_contours(self, xkey='gamma', ykey='alpha', x_sight=[-.9, .9], y_sight=[-.9, .9], num=20, dpi=None, fontsize=8, **kwargs): from ..plotting.utils import update_axes x_var = eval('self.' + xkey) y_var = eval('self.' + ykey) x = np.linspace(x_sight[0], x_sight[1], num=num) * x_var + x_var y = np.linspace(y_sight[0], y_sight[1], num=num) * y_var + y_var f0 = lambda x, y: self.get_loss(**{xkey: x, ykey: y}, reassign_time=False) fp = lambda x, y: self.get_loss(**{xkey: x, ykey: y}, reassign_time=True) z0, zp = np.zeros((len(x), len(x))), np.zeros((len(x), len(x))) for i, xi in enumerate(x): for j, yi in enumerate(y): z0[i, j] = f0(xi, yi) zp[i, j] = fp(xi, yi) # ix, iy = np.unravel_index(zp.argmin(), zp.shape) # gamma_opt, alpha_opt = x[ix], y[iy] x_label = r'$'+'\\'+xkey+'$' if xkey in ['gamma', 'alpha', 'beta'] else xkey y_label = r'$' + '\\' + ykey + '$' if ykey in ['gamma', 'alpha', 'beta'] else ykey figsize = rcParams['figure.figsize'] fig, (ax1, ax2) = pl.subplots(1, 2, figsize=(figsize[0], figsize[1] / 2), dpi=dpi) ax1.contourf(x, y, np.log1p(zp.T), levels=20, cmap='RdGy_r') contours = ax1.contour(x, y, np.log1p(zp.T), 4, colors='k', linewidths=.5) ax1.clabel(contours, inline=True, fontsize=fontsize * .75) ax1.scatter(x=x_var, y=y_var, s=50, c='purple', zorder=3, **kwargs) # ax1.quiver(self.gamma, self.alpha, 0, self.get_optimal_alpha() - self.alpha, color='k', zorder=3, # headlength=4, headwidth=3, headaxislength=3, alpha=.5) ax1.set_xlabel(x_label, fontsize=fontsize) ax1.set_ylabel(y_label, fontsize=fontsize) ax1.set_title('MSE (profiled)', fontsize=fontsize) ax1 = update_axes(ax1, fontsize, frameon=True) ax2.contourf(x, y, np.log1p(z0.T), levels=20, cmap='RdGy_r') contours = ax2.contour(x, y, np.log1p(z0.T), 4, colors='k', linewidths=.5) ax2.clabel(contours, inline=True, fontsize=fontsize * .75) ax2.scatter(x=x_var, y=y_var, s=50, c='purple', zorder=3, **kwargs) ax2.set_xlabel(x_label, fontsize=fontsize) ax2.set_ylabel(y_label, fontsize=fontsize) ax2.set_title('MSE', fontsize=fontsize) ax2 = update_axes(ax2, fontsize, frameon=True) ix, iy = np.unravel_index(zp.argmin(), zp.shape) x_opt, y_opt = x[ix].round(2), y[ix].round(2) return x_opt, y_opt PKyqmNg; ; 0scvelo/tools/dynamical_model_utils_deprecated.pyfrom .dynamical_model_utils import tau_u, inv, unspliced, spliced, vectorize import warnings import numpy as np exp = np.exp def tau_s(s, s0, u0, alpha, beta, gamma, u=None, tau=None, eps=1e-2): if tau is None: tau = tau_u(u, u0, alpha, beta) if u is not None else 1 tau_prev, loss, n_iter, max_iter, mixed_states = 1e6, 1e6, 0, 10, np.any(alpha == 0) b0 = (alpha - beta * u0) * inv(gamma - beta) g0 = s0 - alpha / gamma + b0 with warnings.catch_warnings(): warnings.simplefilter("ignore") while np.abs(tau - tau_prev).max() > eps and loss > eps and n_iter < max_iter: tau_prev, n_iter = tau, n_iter + 1 expu, exps = b0 * exp(-beta * tau), g0 * exp(-gamma * tau) f = exps - expu + alpha / gamma # >0 ft = - gamma * exps + beta * expu # >0 if on else <0 ftt = gamma ** 2 * exps - beta ** 2 * expu # ax^2 + bx + c = 0 <-> 1/2 ftt x^2 + ft x + f = s, where x = (tau - tau_prev) a, b, c = ftt / 2, ft, f - s term = b ** 2 - 4 * a * c update = (-b + np.sqrt(term)) / (2 * a) if mixed_states: # linear approx for off-state due of non-injectivity: tau = tau_prev - c / b update = np.nan_to_num(update) * (alpha > 0) + (- c / b) * (alpha <= 0) tau = np.nan_to_num(tau_prev + update) * (s != 0) if np.any(term > 0) else tau_prev / 10 loss = np.abs(alpha / gamma + g0 * exp(-gamma * tau) - b0 * exp(-beta * tau) - s).max() return np.clip(tau, 0, None) def assign_timepoints_projection(u, s, alpha, beta, gamma, t0_=None, u0_=None, s0_=None, n_timepoints=300): if t0_ is None: t0_ = tau_u(u0_, 0, alpha, beta) if u0_ is None or s0_ is None: u0_, s0_ = (unspliced(t0_, 0, alpha, beta), spliced(t0_, 0, 0, alpha, beta, gamma)) tpoints = np.linspace(0, t0_, num=n_timepoints) tpoints_ = np.linspace(0, tau_u(np.min(u[s > 0]), u0_, 0, beta), num=n_timepoints)[1:] xt = np.vstack([unspliced(tpoints, 0, alpha, beta), spliced(tpoints, 0, 0, alpha, beta, gamma)]).T xt_ = np.vstack([unspliced(tpoints_, u0_, 0, beta), spliced(tpoints_, s0_, u0_, 0, beta, gamma)]).T x_obs = np.vstack([u, s]).T # assign time points (oth. projection onto 'on' and 'off' curve) tau, o, diff = np.zeros(len(u)), np.zeros(len(u), dtype=int), np.zeros(len(u)) tau_alt, diff_alt = np.zeros(len(u)), np.zeros(len(u)) for i, xi in enumerate(x_obs): diffs, diffs_ = np.linalg.norm((xt - xi), axis=1), np.linalg.norm((xt_ - xi), axis=1) idx, idx_ = np.argmin(diffs), np.argmin(diffs_) o[i] = np.argmin([diffs_[idx_], diffs[idx]]) tau[i] = [tpoints_[idx_], tpoints[idx]][o[i]] diff[i] = [diffs_[idx_], diffs[idx]][o[i]] tau_alt[i] = [tpoints_[idx_], tpoints[idx]][1-o[i]] diff_alt[i] = [diffs_[idx_], diffs[idx]][1-o[i]] t = tau * o + (t0_ + tau) * (1 - o) # # remove meaningless jumps (reassign timepoints/states) # idx_ord = np.argsort(t) # t_ord = t[idx_ord] # dt_ord = t_ord - np.insert(t_ord[:-1], 0, 0) # dt = dt_ord[np.argsort(idx_ord)] # # Poisson with std = sqrt(mean) -> ~99.9% confidence # idx = np.where(dt > dt.mean() + 3 * np.sqrt(dt.mean()))[0] # # if len(idx) > 0: # tvals = t[idx] # idx_jumps = np.where(t * (1 - o) >= np.min(tvals[tvals > t0_]))[0] if np.any(tvals > t0_) else [] # idx_jumps_ = np.where(t * o >= np.min(tvals[tvals <= t0_]))[0] if np.any(tvals <= t0_) else [] # idx = np.array(np.concatenate([idx_jumps, idx_jumps_]), dtype=int) # # # change if alternative is reasonable # change = diff_alt[idx] < np.clip(2 * diff[idx], diff.mean() + 2 * diff.std(), None) # tau[idx] = tau_alt[idx] * change + tau[idx] * (1 - change) # o[idx] = (1 - o[idx]) * change + o[idx] * (1 - change) # # t = tau * o + (t0_ + tau) * (1 - o) return t, tau, o """State-independent derivatives""" # def du_du0(beta, tau): # return exp(-beta * tau) # def ds_ds0(gamma, tau): # return exp(-gamma * tau) # def ds_du0(beta, gamma, tau): # return - beta / (gamma - beta) * (exp(-gamma * tau) - exp(-beta * tau)) # def dus_u0s0(tau, beta, gamma): # du_u0 = exp(-beta * tau) # ds_s0 = exp(-gamma * tau) # ds_u0 = - beta / (gamma - beta) * (ds_s0 - du_u0) # return du_u0, ds_s0, ds_u0 # def dus_tau(tau, alpha, beta, gamma, u0=0, s0=0, du0_t0=0, ds0_t0=0): # expu, exps, cb, cc = exp(-beta * tau), exp(-gamma * tau), alpha - beta * u0, alpha - gamma * s0 # du_tau = (cb - du0_t0) * expu # ds_tau = (cc - ds0_t0) * exps - cb / (gamma - beta) * (gamma * exps - beta * expu) # + du0_t0 * beta / (gamma - beta) * (exps - expu) # return du_tau, ds_tau def dtau(u, s, alpha, beta, gamma, u0, s0, du0=[0, 0, 0], ds0=[0, 0, 0, 0]): a, b, g, gb, b0 = alpha, beta, gamma, gamma - beta, beta * inv(gamma - beta) cu = s - a/g - b0 * (u - a/b) c0 = s0 - a/g - b0 * (u0 - a/b) cu += cu == 0 c0 += c0 == 0 cu_, c0_ = 1 / cu, 1 / c0 dtau_a = b0/g * (c0_ - cu_) + 1/g * c0_ * (ds0[0] - b0 * du0[0]) dtau_b = 1/gb**2 * ((u - a/g) * cu_ - (u0 - a/g) * c0_) dtau_c = - a/g * (1/g**2 - 1/gb**2) * (cu_ - c0_) - b0/g/gb * (u*cu_ - u0*c0_) # + 1/g**2 * np.log(cu/c0) return dtau_a, dtau_b, dtau_c def du(tau, alpha, beta, u0=0, du0=[0, 0, 0], dtau=[0, 0, 0]): # du0 is the derivative du0 / d(alpha, beta, tau) expu, cb = exp(-beta * tau), alpha / beta du_a = du0[0] * expu + 1. / beta * (1 - expu) + (alpha - beta * u0) * dtau[0] * expu du_b = du0[1] * expu - cb / beta * (1 - expu) + (cb - u0) * tau * expu + (alpha - beta * u0) * dtau[1] * expu # du_tau = (alpha - beta * u0 - du0[2]) * expuå return du_a, du_b def ds(tau, alpha, beta, gamma, u0=0, s0=0, du0=[0, 0, 0], ds0=[0, 0, 0, 0], dtau=[0, 0, 0]): # ds0 is the derivative ds0 / d(alpha, beta, gamma, tau) expu, exps, = exp(-beta * tau), exp(-gamma * tau) expus = exps - expu cbu = (alpha - beta * u0) * inv(gamma - beta) ccu = (alpha - gamma * u0) * inv(gamma - beta) ccs = alpha / gamma - s0 - cbu # dsu0 = ds0 * exps - beta / (gamma - beta) * du0 ds_a = ds0[0] * exps + 1. / gamma * (1 - exps) + 1 * inv(gamma - beta) * (1 - beta * du0[0]) * expus + (ccs * gamma * exps + cbu * beta * expu) * dtau[0] ds_b = ds0[1] * exps + cbu * tau * expu + 1 * inv(gamma - beta) * (ccu - beta * du0[1]) * expus + (ccs * gamma * exps + cbu * beta * expu) * dtau[1] ds_c = ds0[2] * exps + ccs * tau * exps - alpha / gamma**2 * (1 - exps) - cbu * inv(gamma - beta) * expus + (ccs * gamma * exps + cbu * beta * expu) * dtau[2] # ds_dtau = (alpha - gamma * s0 - ds0[3]) * exps - cbu * (gamma * exps - beta * expu) + du0[2] * beta * inv(gamma - beta) * (exps - expu) return ds_a, ds_b, ds_c def derivatives(u, s, t, t0_, alpha, beta, gamma, scaling=1, alpha_=0, u0=0, s0=0, weights=None): o = np.array(t <= t0_, dtype=int) du0 = np.array(du(t0_, alpha, beta, u0))[:, None] * (1 - o)[None, :] ds0 = np.array(ds(t0_, alpha, beta, gamma, u0, s0))[:, None] * (1 - o)[None, :] tau, alpha, u0, s0 = vectorize(t, t0_, alpha, beta, gamma, alpha_, u0, s0) dt = np.array(dtau(u, s, alpha, beta, gamma, u0, s0, du0, ds0)) # state-dependent derivatives: du_a, du_b = du(tau, alpha, beta, u0, du0, dt) du_a, du_b = du_a * scaling, du_b * scaling ds_a, ds_b, ds_c = ds(tau, alpha, beta, gamma, u0, s0, du0, ds0, dt) # evaluate derivative of likelihood: udiff = np.array(unspliced(tau, u0, alpha, beta) * scaling - u) sdiff = np.array(spliced(tau, s0, u0, alpha, beta, gamma) - s) if weights is not None: udiff = np.multiply(udiff, weights) sdiff = np.multiply(sdiff, weights) dl_a = (du_a * (1 - o)).dot(udiff) + (ds_a * (1 - o)).dot(sdiff) dl_a_ = (du_a * o).dot(udiff) + (ds_a * o).dot(sdiff) dl_b = du_b.dot(udiff) + ds_b.dot(sdiff) dl_c = ds_c.dot(sdiff) # dl_tau = du_tau * udiff + ds_tau * sdiff # dl_t0_ = - du_tau.dot(udiff) - ds_tau.dot(sdiff) dl_tau, dl_t0_ = None, None return dl_a, dl_b, dl_c, dl_a_, dl_tau, dl_t0_ PKsNͿkEKKscvelo/tools/optimization.pyfrom .utils import sum_obs, prod_sum_obs, make_dense from scipy.optimize import minimize from scipy.sparse import csr_matrix, issparse import numpy as np import warnings def get_weight(x, y=None, perc=95): if issparse(x): x = x.A xy_norm = x / np.clip(np.max(x, axis=0), 1e-3, None) if y is not None: if issparse(y): y = y.A xy_norm += y / np.clip(np.max(y, axis=0), 1e-3, None) if isinstance(perc, int): weights = xy_norm >= np.percentile(xy_norm, perc, axis=0) else: lb, ub = np.percentile(xy_norm, perc, axis=0) weights = (xy_norm <= lb) | (xy_norm >= ub) return weights def leastsq_NxN(x, y, fit_offset=False, perc=None): """Solution to least squares: gamma = cov(X,Y) / var(X) """ if perc is not None: if not fit_offset and isinstance(perc, (list, tuple)): perc = perc[1] weights = csr_matrix(get_weight(x, y, perc)).astype(bool) x, y = weights.multiply(x).tocsr(), weights.multiply(y).tocsr() else: weights = None with warnings.catch_warnings(): warnings.simplefilter("ignore") xx_ = prod_sum_obs(x, x) xy_ = prod_sum_obs(x, y) if fit_offset: n_obs = x.shape[0] if weights is None else sum_obs(weights) x_ = sum_obs(x) / n_obs y_ = sum_obs(y) / n_obs gamma = (xy_ / n_obs - x_ * y_) / (xx_ / n_obs - x_ ** 2) offset = y_ - gamma * x_ # fix negative offsets: idx = offset < 0 gamma[idx] = xy_[idx] / xx_[idx] offset = np.clip(offset, 0, None) else: gamma = xy_ / xx_ offset = np.zeros(x.shape[1]) offset[np.isnan(offset)], gamma[np.isnan(gamma)] = 0, 0 return offset, gamma def optimize_NxN(x, y, fit_offset=False, perc=None): """Just to compare with closed-form solution """ if perc is not None: if not fit_offset and isinstance(perc, (list, tuple)): perc = perc[1] weights = get_weight(x, y, perc).astype(bool) if issparse(weights): weights = weights.A else: weights = None x, y = x.astype(np.float64), y.astype(np.float64) n_vars = x.shape[1] offset, gamma = np.zeros(n_vars), np.zeros(n_vars) for i in range(n_vars): xi = x[:, i] if weights is None else x[:, i][weights[:, i]] yi = y[:, i] if weights is None else y[:, i][weights[:, i]] if fit_offset: offset[i], gamma[i] = minimize(lambda m: np.sum((-yi + xi * m[1] + m[0])**2), method="L-BFGS-B", x0=(0, 0.1), bounds=[(0, None), (None, None)]).x else: gamma[i] = minimize(lambda m: np.sum((-yi + xi * m) ** 2), x0=0.1, method="L-BFGS-B").x offset[np.isnan(offset)], gamma[np.isnan(gamma)] = 0, 0 return offset, gamma def leastsq_generalized(x, y, x2, y2, res_std=None, res2_std=None, fit_offset=False, fit_offset2=False, perc=None): """Solution to the 2-dim generalized least squares: gamma = inv(X'QX)X'QY """ if perc is not None: if not fit_offset and isinstance(perc, (list, tuple)): perc = perc[1] weights = csr_matrix(get_weight(x, y, perc)).astype(bool) x, y = weights.multiply(x).tocsr(), weights.multiply(y).tocsr() n_obs, n_var = x.shape offset, offset_ss = np.zeros(n_var, dtype="float32"), np.zeros(n_var, dtype="float32") gamma = np.ones(n_var, dtype="float32") if (res_std is None) or (res2_std is None): res_std, res2_std = np.ones(n_var), np.ones(n_var) ones, zeros = np.ones(n_obs), np.zeros(n_obs) with warnings.catch_warnings(): warnings.simplefilter("ignore") x, y = np.vstack((make_dense(x)/res_std, x2/res2_std)), np.vstack((make_dense(y)/res_std, y2/res2_std)) if fit_offset and fit_offset2: for i in range(n_var): A = np.c_[np.vstack((np.c_[ones/res_std[i], zeros], np.c_[zeros, ones/res2_std[i]])), x[:, i]] offset[i], offset_ss[i], gamma[i] = np.linalg.pinv(A.T.dot(A)).dot(A.T.dot(y[:, i])) elif fit_offset: for i in range(n_var): A = np.c_[np.hstack((ones/res_std[i], zeros)), x[:, i]] offset[i], gamma[i] = np.linalg.pinv(A.T.dot(A)).dot(A.T.dot(y[:, i])) elif fit_offset2: for i in range(n_var): A = np.c_[np.hstack((zeros, ones/res2_std[i])), x[:, i]] offset_ss[i], gamma[i] = np.linalg.pinv(A.T.dot(A)).dot(A.T.dot(y[:, i])) else: for i in range(n_var): A = np.c_[x[:, i]] gamma[i] = np.linalg.pinv(A.T.dot(A)).dot(A.T.dot(y[:, i])) offset[np.isnan(offset)] = 0 offset_ss[np.isnan(offset_ss)] = 0 gamma[np.isnan(gamma)] = 0 return offset, offset_ss, gamma def maximum_likelihood(Ms, Mu, Mus, Mss, fit_offset=False, fit_offset2=False): """Maximizing the log likelihood using weights according to empirical bayes """ n_obs, n_var = Ms.shape offset, offset_ss = np.zeros(n_var, dtype="float32"), np.zeros(n_var, dtype="float32") gamma = np.ones(n_var, dtype="float32") def sse(A, data, b): sigma = (A.dot(data) - b).std(1) return np.log(sigma).sum() # np.log(np.sqrt(2*np.pi) * sigma).sum() + (.5 * (res/sigma[:, None])**2).sum() if fit_offset and fit_offset2: for i in range(n_var): data = np.vstack((Mu[:, i], Ms[:, i], Mus[:, i], Mss[:, i])) offset[i], offset_ss[i], gamma[i] = \ minimize(lambda m: sse(np.array([[1, -m[2], 0, 0], [1, m[2], 2, -2 * m[2]]]), data, b=np.array(m[0], m[1])), x0=(1e-4, 1e-4, 1), method="L-BFGS-B").x elif fit_offset: for i in range(n_var): data = np.vstack((Mu[:, i], Ms[:, i], Mus[:, i], Mss[:, i])) offset[i], gamma[i] = \ minimize(lambda m: sse(np.array([[1, -m[1], 0, 0], [1, m[1], 2, -2 * m[1]]]), data, b=np.array(m[0], 0)), x0=(1e-4, 1), method="L-BFGS-B").x elif fit_offset2: for i in range(n_var): data = np.vstack((Mu[:, i], Ms[:, i], Mus[:, i], Mss[:, i])) offset_ss[i], gamma[i] = \ minimize(lambda m: sse(np.array([[1, -m[1], 0, 0], [1, m[1], 2, -2 * m[1]]]), data, b=np.array(0, m[0])), x0=(1e-4, 1), method="L-BFGS-B").x else: for i in range(n_var): data = np.vstack((Mu[:, i], Ms[:, i], Mus[:, i], Mss[:, i])) gamma[i] = \ minimize(lambda m: sse(np.array([[1, -m, 0, 0], [1, m, 2, -2 * m]]), data, b=0), x0=gamma[i], method="L-BFGS-B").x return offset, offset_ss, gamma PK [NHޠ###scvelo/tools/rank_velocity_genes.pyfrom .. import settings from .. import logging as logg from .utils import strings_to_categoricals from scipy.sparse import issparse import numpy as np def get_mean_var(X, ignore_zeros=False, perc=None): data = X.data if issparse(X) else X mask_nans = np.isnan(data) | np.isinf(data) | np.isneginf(data) n_nonzeros = (X != 0).sum(0) n_counts = n_nonzeros if ignore_zeros else X.shape[0] if mask_nans.sum() > 0: if issparse(X): data[np.isnan(data) | np.isinf(data) | np.isneginf(data)] = 0 n_nans = n_nonzeros - (X != 0).sum(0) else: X[mask_nans] = 0 n_nans = mask_nans.sum(0) n_counts -= n_nans if perc is not None: if np.size(perc) < 2: perc = [perc, 100] if perc < 50 else [0, perc] lb, ub = np.percentile(data, perc) data = np.clip(data, lb, ub) mean = (X.sum(0) / n_counts).A1 if issparse(X) else X.sum(0) / n_counts mean_sq = (X.multiply(X).sum(0) / n_counts).A1 if issparse(X) else np.multiply(X, X).sum(0) / n_counts var = (mean_sq - mean ** 2) * (X.shape[0] / (X.shape[0] - 1)) mean = np.nan_to_num(mean) var = np.nan_to_num(var) return mean, var def select_groups(adata, groups='all', key='louvain'): """Get subset of groups in adata.obs[key]. """ strings_to_categoricals(adata) if isinstance(groups, list) and isinstance(groups[0], int): groups = [str(n) for n in groups] categories = adata.obs[key].cat.categories groups_masks = np.array([categories[i] == adata.obs[key].values for i, name in enumerate(categories)]) if groups == 'all': groups = categories.values else: groups_ids = [np.where(categories.values == name)[0][0] for name in groups] groups_masks = groups_masks[groups_ids] groups = categories[groups_ids].values return groups, groups_masks def velocity_clusters(data, vkey='velocity', match_with='clusters', resolution=None, copy=False): adata = data.copy() if copy else data logg.info('computing velocity clusters', r=True) from .. import AnnData vdata = AnnData(adata.layers[vkey]) vdata.obs = adata.obs.copy() vdata.var = adata.var.copy() tmp_filter = np.ones(vdata.n_vars, dtype=bool) if vkey + '_genes' in vdata.var.keys(): tmp_filter &= vdata.var[vkey + '_genes'] if 'unspliced' in vdata.layers.keys(): n_counts = (vdata.layers['unspliced'] > 0).sum(0) n_counts = n_counts.A1 if issparse(vdata.layers['unspliced']) else n_counts min_counts = min(50, np.percentile(n_counts, 50)) tmp_filter &= (n_counts > min_counts) if 'r2' in vdata.var.keys(): r2 = vdata.var.velocity_r2 min_r2 = np.percentile(r2[r2 > 0], 50) tmp_filter &= (r2 > min_r2) if 'dispersions_norm' in vdata.var.keys(): dispersions = vdata.var.dispersions_norm min_dispersion = np.percentile(dispersions, 20) tmp_filter &= (dispersions > min_dispersion) vdata._inplace_subset_var(tmp_filter) import scanpy.api as sc verbosity_tmp = sc.settings.verbosity sc.settings.verbosity = 0 sc.pp.pca(vdata, n_comps=20, svd_solver='arpack') sc.pp.neighbors(vdata, n_pcs=20) sc.tl.louvain(vdata, resolution=resolution) sc.settings.verbosity = verbosity_tmp if isinstance(match_with, str) and match_with in adata.obs.keys(): from .utils import most_common_in_list vc = vdata.obs['louvain'] for i, cat in enumerate(vc.cat.categories): cells_in_cat = np.where(vc == cat)[0] new_cat = most_common_in_list(adata.obs[match_with][cells_in_cat]) vc = vc.cat.rename_categories({cat: str(new_cat) + ' ' + str(cat)}) vdata.obs['louvain'] = vc adata.obs[vkey + '_clusters'] = vdata.obs['louvain'].copy() del vdata logg.info(' finished', time=True, end=' ' if settings.verbosity > 2 else '\n') logg.hint( 'added \n' ' \'' + vkey + '_clusters\', clusters based on modularity on velocity field (adata.obs)') return adata if copy else None def rank_velocity_genes(data, vkey='velocity', n_genes=10, groupby=None, match_with=None, resolution=None, min_counts=None, min_r2=None, min_dispersion=None, copy=False): """Rank genes for velocity characterizing groups. Arguments ---------- data : :class:`~anndata.AnnData` Annotated data matrix. vkey: `str` (default: `'velocity'`) Key of velocities computed in `tl.velocity` n_genes : `int`, optional (default: 100) The number of genes that appear in the returned tables. groupby: `str`, `list` or `np.ndarray` (default: `None`) Key of observations grouping to consider. min_counts: `float` (default: None) Minimum count of genes for consideration. min_r2: `float` (default: None) Minimum r2 value of genes for consideration. min_dispersion: `float` (default: None) Minimum dispersion norm value of genes for consideration. copy: `bool` (default: `False`) Return a copy instead of writing to data. Returns ------- Returns or updates `data` with the attributes rank_velocity_genes : `.uns` Structured array to be indexed by group id storing the gene names. Ordered according to scores. velocity_score : `.var` Storing the score for each gene for each group. Ordered according to scores. """ adata = data.copy() if copy else data if groupby is None or groupby == 'velocity_clusters': velocity_clusters(adata, vkey=vkey, match_with=match_with, resolution=resolution) groupby = 'velocity_clusters' logg.info('ranking velocity genes', r=True) tmp_filter = np.ones(adata.n_vars, dtype=bool) if vkey + '_genes' in adata.var.keys(): tmp_filter &= adata.var[vkey + '_genes'] if 'unspliced' in adata.layers.keys(): n_counts = (adata.layers['unspliced'] > 0).sum(0) n_counts = n_counts.A1 if issparse(adata.layers['unspliced']) else n_counts min_counts = min(50, np.percentile(n_counts, 50)) if min_counts is None else min_counts tmp_filter &= (n_counts > min_counts) if 'r2' in adata.var.keys(): r2 = adata.var.velocity_r2 min_r2 = .1 if min_r2 is None else min_r2 # np.percentile(r2[r2 > 0], 50) tmp_filter &= (r2 > min_r2) if 'dispersions_norm' in adata.var.keys(): dispersions = adata.var.dispersions_norm min_dispersion = 0 if min_dispersion is None else min_dispersion # np.percentile(dispersions, 20) tmp_filter &= (dispersions > min_dispersion) X = adata[:, tmp_filter].layers[vkey] groups, groups_masks = select_groups(adata[:, tmp_filter], key=groupby) n_groups = groups_masks.shape[0] sizes = groups_masks.sum(1) mean, var = np.zeros((n_groups, X.shape[1])), np.zeros((n_groups, X.shape[1])) for i, mask in enumerate(groups_masks): mean[i], var[i] = get_mean_var(X[mask]) # test each against the union of all other groups rankings_gene_names, rankings_gene_scores, indices = [], [], [] for i in range(n_groups): mask_rest = ~groups_masks[i] mean_rest, var_rest = get_mean_var(X[mask_rest]) size_rest = sizes[i] # else mask_rest.sum() if method == 't-test' scores = (mean[i] - mean_rest) / np.sqrt(var[i] / sizes[i] + var_rest / size_rest) scores = np.nan_to_num(scores) # equivalent to but much faster than np.argsort(scores)[-10:] if n_genes > X.shape[1]: n_genes = X.shape[1] idx = np.argpartition(scores, -n_genes)[-n_genes:] idx = idx[np.argsort(scores[idx])[::-1]] rankings_gene_names.append(adata[:, tmp_filter].var_names[idx].values) rankings_gene_scores.append(scores[idx]) rankings_gene_names = np.array([list(n) for n in rankings_gene_names]) rankings_gene_scores = np.array([list(n) for n in rankings_gene_scores]) all_names = rankings_gene_names.T.flatten() all_scores = rankings_gene_scores.T.flatten() vscore = np.zeros(adata.n_vars, dtype=np.int) for i, name in enumerate(adata.var_names): if name in all_names: vscore[i] = all_scores[np.where(name == all_names)[0][0]] adata.var['velocity_score'] = vscore key = 'rank_velocity_genes' if key not in adata.uns.keys(): adata.uns[key] = {} #adata.uns[key] = {'groups': groups, 'names': rankings_gene_names, 'scores': rankings_gene_scores.round(0)} adata.uns[key] = \ {'names': np.rec.fromarrays([n for n in rankings_gene_names], dtype=[(rn, 'U50') for rn in groups]), 'scores': np.rec.fromarrays([n.round(2) for n in rankings_gene_scores], dtype=[(rn, 'float32') for rn in groups]), 'params': {'groupby': groupby, 'reference': 'rest', 'method': 't-test_overestim_var', 'use_raw': True}} logg.info(' finished', time=True, end=' ' if settings.verbosity > 2 else '\n') logg.hint( 'added \n' ' \'' + key + '\', sorted scores by group ids (adata.uns)') return adata if copy else None PK>N^u"u"scvelo/tools/run.pyfrom ..preprocessing import filter_and_normalize, moments from . import velocity, velocity_graph, velocity_embedding def run_all(data, basis=None, mode=None, min_counts=30, min_counts_u=20, n_top_genes=3000, n_pcs=30, n_neighbors=30, copy=False): from time import time start = time() adata = data.copy() if copy else data filter_and_normalize(adata, min_counts=min_counts, min_counts_u=min_counts_u, n_top_genes=n_top_genes) print("Number of genes to be used:", adata.X.shape[1]) moments(adata, n_neighbors=n_neighbors, n_pcs=n_pcs) velocity(adata, mode=mode) print("Number of genes to be used:", adata.var.velocity_genes.sum()) velocity_graph(adata) if basis is not None: velocity_embedding(adata, basis=basis) print('Total time (seconds): ' + str(round(time() - start, 2))) return adata if copy else None def convert_to_adata(vlm, basis=None): from collections import OrderedDict from .. import AnnData X = vlm.S_norm.T if hasattr(vlm, 'S_norm') else vlm.S_sz.T if hasattr(vlm, 'S_sz') else vlm.S.T # .sparse().T.tocsr() layers = OrderedDict() layers['spliced'] = vlm.S_sz.T if hasattr(vlm, 'S_sz') else vlm.S.T layers['unspliced'] = vlm.U_sz.T if hasattr(vlm, 'U_sz') else vlm.U.T if hasattr(vlm, 'A') and (vlm.A.T.shape == layers['spliced'].shape): layers['ambiguous'] = vlm.A.T if hasattr(vlm, 'velocity'): layers['velocity'] = vlm.velocity.T if hasattr(vlm, 'Sx'): layers['Ms'] = vlm.Sx.T if hasattr(vlm, 'Ux'): layers['Mu'] = vlm.Ux.T obs = dict(vlm.ca) if 'CellID' in obs.keys(): obs['obs_names'] = obs.pop('CellID') var = dict(vlm.ra) if 'Gene' in var.keys(): var['var_names'] = var.pop('Gene') if hasattr(vlm, 'q'): var['velocity_offset'] = vlm.q if hasattr(vlm, 'gammas'): var['velocity_gamma'] = vlm.gammas if hasattr(vlm, 'R2'): var['velocity_r2'] = vlm.R2 adata = AnnData(X, obs=obs, var=var, layers=layers) if hasattr(vlm, 'corrcoef'): adata.uns['velocity_graph'] = vlm.corrcoef if basis is not None: if hasattr(vlm, 'ts'): adata.obsm['X_' + basis] = vlm.ts if hasattr(vlm, 'delta_embedding'): adata.obsm['velocity_' + basis] = vlm.delta_embedding return adata def convert_to_loom(adata, basis=None): from scipy.sparse import issparse import numpy as np import velocyto class VelocytoLoom(velocyto.VelocytoLoom): def __init__(self, adata, basis=None): self.S = adata.layers['spliced'].T self.U = adata.layers['unspliced'].T self.S = self.S.A if issparse(self.S) else self.S self.U = self.U.A if issparse(self.U) else self.U if 'initial_size_spliced' in adata.obs.keys(): self.initial_cell_size = adata.obs['initial_size_spliced'] self.initial_Ucell_size = adata.obs['initial_size_unspliced'] else: self.initial_cell_size = self.S.sum(0) self.initial_Ucell_size = self.U.sum(0) from ..preprocessing.utils import not_yet_normalized if not not_yet_normalized(adata.layers['spliced']): self.S_sz = self.S self.U_sz = self.U self.S_norm = np.log1p(self.S_sz) if 'Ms' in adata.layers.keys(): self.Sx_sz = self.Sx = adata.layers['Ms'].T self.Ux_sz = self.Ux = adata.layers['Mu'].T if 'X_pca' in adata.obsm.keys(): self.pcs = adata.obsm['X_pca'] if 'velocity' in adata.layers.keys(): self.velocity = adata.layers['velocity'] if 'ambiguous' in adata.layers.keys(): self.A = adata.layers['ambiguous'].T if issparse(self.A): self.A = self.A.A self.ca = dict() self.ra = dict() self.ca['CellID'] = np.array(adata.obs_names) self.ra['Gene'] = np.array(adata.var_names) for key in adata.obs.keys(): self.ca[key] = np.array(adata.obs[key].values) for key in adata.var.keys(): self.ra[key] = np.array(adata.var[key].values) basis = 'umap' if basis is None else basis if isinstance(basis, str) and 'X_' + basis in adata.obsm.keys(): if 'X_' + basis in adata.obsm.keys(): self.ts = adata.obsm['X_' + basis] if 'velocity_' + basis in adata.obsm.keys(): self.delta_embedding = adata.obsm['velocity_' + basis] if 'clusters' in self.ca: self.set_clusters(self.ca['clusters']) elif 'louvain' in self.ca: self.set_clusters(self.ca['louvain']) def filter_and_normalize(self, min_counts=3, min_counts_u=3, min_cells=0, min_cells_u=0, n_top_genes=None, max_expr_avg=20): # counterpart to scv.pp.filter_and_normalize() self.score_detection_levels(min_expr_counts=min_counts, min_expr_counts_U=min_counts_u, min_cells_express=min_cells, min_cells_express_U=min_cells_u) self.filter_genes(by_detection_levels=True) if n_top_genes is not None and n_top_genes < self.S.shape[0]: self.score_cv_vs_mean(n_top_genes, max_expr_avg=max_expr_avg) self.filter_genes(by_cv_vs_mean=True) self._normalize_S(relative_size=self.initial_cell_size, target_size=np.median(self.initial_cell_size)) self._normalize_U(relative_size=self.initial_Ucell_size, target_size=np.median(self.initial_Ucell_size)) self.S_norm = np.log1p(self.S_sz) print("Number of genes to be used:", self.S.shape[0]) def impute(self, n_pcs=30, n_neighbors=30, balanced=False, renormalize=False): # counterpart to scv.pp.moments(adata, method='sklearn', mode='distances') if not hasattr(self, 'pcs'): self.perform_PCA(n_components=n_pcs) k = n_neighbors self.knn_imputation(n_pca_dims=n_pcs, k=k, balanced=balanced, b_sight=k * 8, b_maxl=k * 4) if renormalize: self.normalize_median() def velocity_estimation(self, fit_offset=False, perc=[5, 95], filter_genes=False): self.fit_gammas(limit_gamma=False, fit_offset=fit_offset, weighted=(perc is not None), maxmin_perc=perc) if filter_genes: self.filter_genes_good_fit() self.predict_U() self.calculate_velocity() self.calculate_shift() self.extrapolate_cell_at_t() print("Number of genes to be used:", self.S.shape[0]) def velocity_graph(self, n_neighbors=100, transform='linear', sampled_fraction=.5, expression_scaling=False, sigma_corr=.05, calculate_randomized=False): if not hasattr(self, 'ts'): raise ValueError('Compute embedding first.') else: # counterpart to scv.tl.velocity_graph() self.estimate_transition_prob(hidim="Sx_sz", embed="ts", transform=transform, n_neighbors=n_neighbors, knn_random=True, sampled_fraction=sampled_fraction, calculate_randomized=calculate_randomized) # counterpart to scv.tl.velocity_embedding() self.calculate_embedding_shift(sigma_corr=sigma_corr, expression_scaling=expression_scaling) def velocity_embedding(self, smooth=0.5, steps=(50, 50), n_neighbors=100): self.calculate_grid_arrows(smooth=smooth, steps=steps, n_neighbors=n_neighbors) def run_all(self, min_counts=3, min_counts_u=3, n_pcs=30, n_neighbors=30, n_neighbors_graph=100, n_top_genes=None, fit_offset=False, limit_gamma=False, transform='linear', expression_scaling=False,): from time import time start = time() self.filter_and_normalize(min_counts=min_counts, min_counts_u=min_counts_u, n_top_genes=n_top_genes) print('Preprocessing: ' + str(round(time() - start, 2))) timestamp = time() self.impute(n_pcs=n_pcs, n_neighbors=n_neighbors) print('Imputation: ' + str(round(time() - timestamp, 2))) timestamp = time() self.velocity_estimation(limit_gamma=limit_gamma, fit_offset=fit_offset) print('Velocity Estimation: ' + str(round(time() - timestamp, 2))) timestamp = time() self.velocity_graph(n_neighbors=n_neighbors_graph, transform=transform, expression_scaling=expression_scaling) print('Velocity Graph: ' + str(round(time() - timestamp, 2))) print('Total: ' + str(round(time() - start, 2))) return VelocytoLoom(adata, basis) PK;|YN ##scvelo/tools/terminal_states.pyfrom .. import settings from .. import logging as logg from ..preprocessing.moments import get_connectivities from .velocity_graph import VelocityGraph from .transition_matrix import transition_matrix from .utils import scale, groups_to_bool, strings_to_categoricals from scipy.sparse import linalg, csr_matrix, issparse import numpy as np def cell_fate(data, groupby='clusters', disconnected_groups=None, self_transitions=False, n_neighbors=None, copy=False): """Computes individual cell endpoints Arguments --------- data: :class:`~anndata.AnnData` Annotated data matrix. groupby: `str` (default: `'clusters'`) Key to which to assign the fates. disconnected_groups: list of `str` (default: `None`) Which groups to treat as disconnected for fate assignment. n_neighbors: `int` (default: `None`) Number of neighbors to restrict transitions to. copy: `bool` (default: `False`) Return a copy instead of writing to `adata`. Returns ------- Returns or updates `adata` with the attributes cell_fate: `.obs` most likely cell fate for each individual cell cell_fate_confidence: `.obs` confidence of transitioning to the assigned fate """ adata = data.copy() if copy else data logg.info('computing cell fates', r=True) n_neighbors = 10 if n_neighbors is None else n_neighbors _adata = adata.copy() vgraph = VelocityGraph(_adata, n_neighbors=n_neighbors, approx=True, n_recurse_neighbors=1) vgraph.compute_cosines() _adata.uns['velocity_graph'] = vgraph.graph _adata.uns['velocity_graph_neg'] = vgraph.graph_neg T = transition_matrix(_adata, self_transitions=self_transitions) I = np.eye(_adata.n_obs) fate = np.linalg.inv(I - T) if issparse(T): fate = fate.A cell_fates = np.array(_adata.obs[groupby][fate.argmax(1)]) if disconnected_groups is not None: idx = _adata.obs[groupby].isin(disconnected_groups) cell_fates[idx] = _adata.obs[groupby][idx] adata.obs['cell_fate'] = cell_fates adata.obs['cell_fate_confidence'] = fate.max(1) / fate.sum(1) strings_to_categoricals(adata) logg.info(' finished', time=True, end=' ' if settings.verbosity > 2 else '\n') logg.hint( 'added\n' ' \'cell_fate\', most likely cell fate (adata.obs)\n' ' \'cell_fate_confidence\', confidence of transitioning to the assigned fate (adata.obs)') return adata if copy else None def cell_origin(data, groupby='clusters', disconnected_groups=None, self_transitions=False, n_neighbors=None, copy=False): """Computes individual cell root points Arguments --------- data: :class:`~anndata.AnnData` Annotated data matrix. groupby: `str` (default: `'clusters'`) Key to which to assign the fates. disconnected_groups: list of `str` (default: `None`) Which groups to treat as disconnected for fate assignment. n_neighbors: `int` (default: `None`) Number of neighbors to restrict transitions to. copy: `bool` (default: `False`) Return a copy instead of writing to `adata`. Returns ------- Returns or updates `adata` with the attributes cell_origin: `.obs` most likely cell origin for each individual cell cell_origin_confidence: `.obs` confidence of coming from assigned origin """ adata = data.copy() if copy else data logg.info('computing cell fates', r=True) n_neighbors = 10 if n_neighbors is None else n_neighbors _adata = adata.copy() vgraph = VelocityGraph(_adata, n_neighbors=n_neighbors, approx=True, n_recurse_neighbors=1) vgraph.compute_cosines() _adata.uns['velocity_graph'] = vgraph.graph _adata.uns['velocity_graph_neg'] = vgraph.graph_neg T = transition_matrix(_adata, self_transitions=self_transitions, backward=True) I = np.eye(_adata.n_obs) fate = np.linalg.inv(I - T) if issparse(T): fate = fate.A cell_fates = np.array(_adata.obs[groupby][fate.argmax(1)]) if disconnected_groups is not None: idx = _adata.obs[groupby].isin(disconnected_groups) cell_fates[idx] = _adata.obs[groupby][idx] adata.obs['cell_origin'] = cell_fates adata.obs['cell_origin_confidence'] = fate.max(1) / fate.sum(1) strings_to_categoricals(adata) logg.info(' finished', time=True, end=' ' if settings.verbosity > 2 else '\n') logg.hint( 'added\n' ' \'cell_origin\', most likely cell origin (adata.obs)\n' ' \'cell_origin_confidence\', confidence of coming from the assigned origin (adata.obs)') def eigs(T, k=10, eps=1e-3, perc=None): try: eigvals, eigvecs = linalg.eigs(T.T, k=k, which='LR') # find k eigs with largest real part p = np.argsort(eigvals)[::-1] # sort in descending order of eigenvalues eigvals = eigvals.real[p] eigvecs = eigvecs.real[:, p] idx = (eigvals >= 1 - eps) # select eigenvectors with eigenvalue of 1 eigvals = eigvals[idx] eigvecs = np.absolute(eigvecs[:, idx]) if perc is not None: lbs, ubs = np.percentile(eigvecs, perc, axis=0) eigvecs[eigvecs < lbs] = 0 eigvecs = np.clip(eigvecs, 0, ubs) eigvecs /= eigvecs.max(0) except: eigvals, eigvecs = np.empty(0), np.zeros(shape=(T.shape[0], 0)) return eigvals, eigvecs def write_to_obs(adata, key, vals, cell_subset=None): if cell_subset is None: adata.obs[key] = vals else: vals_all = adata.obs[key].copy() if key in adata.obs.keys() else np.zeros(adata.n_obs) vals_all[cell_subset] = vals adata.obs[key] = vals_all def terminal_states(data, vkey='velocity', groupby=None, groups=None, self_transitions=False, basis=None, weight_diffusion=0, scale_diffusion=1, eps=1e-3, copy=False): """Computes terminal states (root and end points). Arguments --------- data: :class:`~anndata.AnnData` Annotated data matrix. vkey: `str` (default: `'velocity'`) Name of velocity estimates to be used. self_transitions: `bool` (default: `False`) Allow transitions from one node to itself. basis: `str` (default: `None`) Basis to use. weight_diffusion: `float` (default: 0) Relative weight to be given to diffusion kernel (Brownian motion) scale_diffusion: `float` (default: 1) Scale of diffusion kernel. eps: `float` (default: 1e-3) Tolerance for eigenvalue selection. copy: `bool` (default: `False`) Return a copy instead of writing to data. Returns ------- Returns or updates `data` with the attributes root: `.obs` sparse matrix with transition probabilities. end: `.obs` sparse matrix with transition probabilities. """ adata = data.copy() if copy else data logg.info('computing terminal states', r=True) strings_to_categoricals(adata) groupby = 'cell_fate' if groupby is None and 'cell_fate' in adata.obs.keys() else groupby categories = adata.obs[groupby].cat.categories if groupby is not None and groups is None else [None] for cat in categories: groups = cat if cat is not None else groups cell_subset = groups_to_bool(adata, groups=groups, groupby=groupby) _adata = adata if groups is None else adata[cell_subset] connectivities = get_connectivities(_adata, 'distances') T = transition_matrix(_adata, vkey=vkey, basis=basis, weight_diffusion=weight_diffusion, scale_diffusion=scale_diffusion, self_transitions=self_transitions, backward=True) eigvecs_roots = eigs(T, eps=eps, perc=[2, 98])[1] roots = csr_matrix.dot(connectivities, eigvecs_roots).sum(1) roots = scale(np.clip(roots, 0, np.percentile(roots, 98))) write_to_obs(adata, 'root_cells', roots, cell_subset) T = transition_matrix(_adata, vkey=vkey, basis=basis, weight_diffusion=weight_diffusion, scale_diffusion=scale_diffusion, self_transitions=self_transitions, backward=False) eigvecs_ends = eigs(T, eps=eps, perc=[2, 98])[1] ends = csr_matrix.dot(connectivities, eigvecs_ends).sum(1) ends = scale(np.clip(ends, 0, np.percentile(ends, 98))) write_to_obs(adata, 'end_points', ends, cell_subset) n_roots, n_ends = eigvecs_roots.shape[1], eigvecs_ends.shape[1] groups_str = ' (' + groups + ')' if isinstance(groups, str) else '' logg.info(' identified ' + str(n_roots) + ' root cells and ' + str(n_ends) + ' end points' + groups_str) logg.info(' finished', time=True, end=' ' if settings.verbosity > 2 else '\n') logg.hint( 'added\n' ' \'root_cells\', root cells of Markov diffusion process (adata.obs)\n' ' \'end_points\', end points of Markov diffusion process (adata.obs)') return adata if copy else None PKCN !scvelo/tools/transition_matrix.pyfrom ..preprocessing.neighbors import get_connectivities from .utils import normalize from scipy.spatial.distance import pdist, squareform from scipy.sparse import csr_matrix import numpy as np def transition_matrix(adata, vkey='velocity', basis=None, backward=False, self_transitions=True, scale=10, perc=None, use_negative_cosines=False, weight_diffusion=0, scale_diffusion=1, weight_indirect_neighbors=None, n_neighbors=None, vgraph=None): """Computes transition probabilities from velocity graph Arguments --------- adata: :class:`~anndata.AnnData` Annotated data matrix. vkey: `str` (default: `'velocity'`) Name of velocity estimates to be used. basis: `str` or `None` (default: `None`) Restrict transition to embedding if specified backward: `bool` (default: `False`) Whether to use the transition matrix to push forward (`False`) or to pull backward (`True`) scale: `float` (default: 10) Scale parameter of gaussian kernel. weight_diffusion: `float` (default: 0) Relative weight to be given to diffusion kernel (Brownian motion) scale_diffusion: `float` (default: 1) Scale of diffusion kernel. Returns ------- Returns sparse matrix with transition probabilities. """ if vkey+'_graph' not in adata.uns: raise ValueError('You need to run `tl.velocity_graph` first to compute cosine correlations.') graph = csr_matrix(adata.uns[vkey + '_graph']).copy() if vgraph is None else vgraph.copy() if self_transitions: confidence = graph.max(1).A.flatten() ub = np.percentile(confidence, 98) self_prob = np.clip(ub - confidence, 0, 1) graph.setdiag(self_prob) T = np.expm1(graph * scale) # equivalent to np.exp(graph.A * scale) - 1 if vkey + '_graph_neg' in adata.uns.keys(): graph_neg = adata.uns[vkey + '_graph_neg'] if use_negative_cosines: T -= np.expm1(-graph_neg * scale) else: T += np.expm1(graph_neg * scale) T.data += 1 # weight direct and indirect (recursed) neighbors if 'neighbors' in adata.uns.keys() and weight_indirect_neighbors is not None and weight_indirect_neighbors < 1: direct_neighbors = adata.uns['neighbors']['distances'] > 0 direct_neighbors.setdiag(1) w = weight_indirect_neighbors T = w * T + (1-w) * direct_neighbors.multiply(T) if backward: T = T.T T = normalize(T) if n_neighbors is not None: T = T.multiply(get_connectivities(adata, mode='distances', n_neighbors=n_neighbors, recurse_neighbors=True)) if perc is not None: threshold = np.percentile(T.data, perc) T.data[T.data < threshold] = 0 T.eliminate_zeros() if 'X_' + str(basis) in adata.obsm.keys(): dists_emb = (T > 0).multiply(squareform(pdist(adata.obsm['X_' + basis]))) scale_diffusion *= dists_emb.data.mean() diffusion_kernel = dists_emb.copy() diffusion_kernel.data = np.exp(-.5 * dists_emb.data ** 2 / scale_diffusion ** 2) T = T.multiply(diffusion_kernel) # combine velocity based kernel with diffusion based kernel if 0 < weight_diffusion < 1: # add another diffusion kernel (Brownian motion - like) diffusion_kernel.data = np.exp(-.5 * dists_emb.data ** 2 / (scale_diffusion/2) ** 2) T = (1-weight_diffusion) * T + weight_diffusion * diffusion_kernel T = normalize(T) return TPK4NOWsJ#J#scvelo/tools/utils.pyfrom scipy.sparse import csr_matrix, issparse import pandas as pd import numpy as np import warnings def mean(x, axis=0): return x.mean(axis).A1 if issparse(x) else x.mean(axis) def make_dense(X): XA = X.A if issparse(X) and X.ndim == 2 else X.A1 if issparse(X) else X if XA.ndim == 2: XA = XA[0] if XA.shape[0] == 1 else XA[:, 0] if XA.shape[1] == 1 else XA return XA def sum_obs(A): """summation over axis 0 (obs) equivalent to np.sum(A, 0) """ return A.sum(0).A1 if issparse(A) else np.einsum('ij -> j', A) def prod_sum_obs(A, B): """dot product and sum over axis 0 (obs) equivalent to np.sum(A * B, 0) """ return A.multiply(B).sum(0).A1 if issparse(A) else np.einsum('ij, ij -> j', A, B) def prod_sum_var(A, B): """dot product and sum over axis 1 (var) equivalent to np.sum(A * B, 1) """ return A.multiply(B).sum(1).A1 if issparse(A) else np.einsum('ij, ij -> i', A, B) def norm(A): """computes the L2-norm along axis 1 (e.g. genes or embedding dimensions) equivalent to np.linalg.norm(A, axis=1) """ return np.sqrt(A.multiply(A).sum(1).A1) if issparse(A) else np.sqrt(np.einsum('ij, ij -> i', A, A)) def vector_norm(x): """computes the L2-norm along axis 1 (e.g. genes or embedding dimensions) equivalent to np.linalg.norm(A, axis=1) """ return np.sqrt(np.einsum('i, i -> ', x, x)) def R_squared(residual, total): with warnings.catch_warnings(): warnings.simplefilter("ignore") r2 = np.ones(residual.shape[1]) - prod_sum_obs(residual, residual) / prod_sum_obs(total, total) r2[np.isnan(r2)] = 0 return r2 def cosine_correlation(dX, Vi): dX -= dX.mean(-1)[:, None] Vi_norm = vector_norm(Vi) with warnings.catch_warnings(): warnings.simplefilter("ignore") result = np.zeros(dX.shape[0]) if Vi_norm == 0 else np.einsum('ij, j', dX, Vi) / (norm(dX) * Vi_norm)[None, :] return result def normalize(X): with warnings.catch_warnings(): warnings.simplefilter("ignore") X = X.multiply(csr_matrix(1. / np.abs(X).sum(1))) if issparse(X) else X / X.sum(1) return X def scale(X, min=0, max=1): idx = np.isfinite(X) if any(idx): X = X - X[idx].min() + min xmax = X[idx].max() X = X / xmax * max if xmax != 0 else X * max return X def get_indices(dist, n_neighbors=None): D = dist.copy() n_counts = (D > 0).sum(1).A1 if issparse(D) else (D > 0).sum(1) n_neighbors = n_counts.min() if n_neighbors is None else min(n_counts.min(), n_neighbors) rows = np.where(n_counts > n_neighbors)[0] cumsum_neighs = np.insert(n_counts.cumsum(), 0, 0) dat = D.data for row in rows: n0, n1 = cumsum_neighs[row], cumsum_neighs[row + 1] rm_idx = n0 + dat[n0:n1].argsort()[n_neighbors:] dat[rm_idx] = 0 D.eliminate_zeros() indices = D.indices.reshape((-1, n_neighbors)) return indices, D def get_iterative_indices(indices, index, n_recurse_neighbors=2, max_neighs=None): def iterate_indices(indices, index, n_recurse_neighbors): return indices[iterate_indices(indices, index, n_recurse_neighbors - 1)] \ if n_recurse_neighbors > 1 else indices[index] indices = np.unique(iterate_indices(indices, index, n_recurse_neighbors)) if max_neighs is not None and len(indices) > max_neighs: indices = np.random.choice(indices, max_neighs, replace=False) return indices def groups_to_bool(adata, groups, groupby=None): groups = [groups] if isinstance(groups, str) else groups if isinstance(groups, (list, tuple, np.ndarray, np.record)): groupby = groupby if groupby in adata.obs.keys() else 'clusters' if 'clusters' in adata.obs.keys() \ else 'louvain' if 'louvain' in adata.obs.keys() else None if groupby is not None: groups = np.array([key in groups for key in adata.obs[groupby]]) else: raise ValueError('groupby attribute not valid.') return groups def most_common_in_list(lst): lst = list(lst) return max(set(lst), key=lst.count) def randomized_velocity(adata, vkey='velocity', add_key='velocity_random'): V_rnd = adata.layers[vkey].copy() for i in range(V_rnd.shape[1]): np.random.shuffle(V_rnd[:, i]) V_rnd[:, i] = V_rnd[:, i] * np.random.choice(np.array([+1, -1]), size=V_rnd.shape[0]) adata.layers[add_key] = V_rnd from .velocity_graph import velocity_graph from .velocity_embedding import velocity_embedding velocity_graph(adata, vkey=add_key) velocity_embedding(adata, vkey=add_key, autoscale=False) def extract_int_from_str(array): def str_to_int(item): num = "".join(filter(str.isdigit, item)) num = int(num) if len(num) > 0 else -1 return num if isinstance(array, str): nums = str_to_int(array) elif len(array) > 1 and isinstance(array[0], str): nums = [] for item in array: nums.append(str_to_int(item)) else: nums = array nums = pd.Categorical(nums) if array.dtype == 'category' else np.array(nums) return nums def strings_to_categoricals(adata): """Transform string annotations to categoricals. """ from pandas import Categorical def is_valid_dtype(values): from pandas.api.types import is_string_dtype, is_integer_dtype, is_bool_dtype return is_string_dtype(values) or is_integer_dtype(values) or is_bool_dtype(values) for df in [adata.obs, adata.var]: string_cols = [key for key in df.columns if is_valid_dtype(df[key])] for key in string_cols: c = df[key] c = Categorical(c) if len(c.categories) < min(len(c), 100): df[key] = c def merge_groups(adata, key, map_groups, key_added=None, map_colors=None): strings_to_categoricals(adata) if len(map_groups) != len(adata.obs[key].cat.categories): map_coarse = {} for c in adata.obs[key].cat.categories: for group in map_groups: if any(cluster == c for cluster in map_groups[group]): map_coarse[c] = group if c not in map_coarse: map_coarse[c] = c map_groups = map_coarse if key_added is None: key_added = key + '_coarse' from pandas.api.types import CategoricalDtype adata.obs[key_added] = adata.obs[key].map(map_groups).astype(CategoricalDtype()) old_categories = adata.obs[key].cat.categories new_categories = adata.obs[key_added].cat.categories # map_colors is passed if map_colors is not None: old_colors = None if key + '_colors' in adata.uns: old_colors = adata.uns[key + '_colors'] new_colors = [] for group in adata.obs[key_added].cat.categories: if group in map_colors: new_colors.append(map_colors[group]) elif group in old_categories and old_colors is not None: new_colors.append(old_colors[old_categories.get_loc(group)]) else: raise ValueError('You didn\'t specify a color for {}.'.format(group)) adata.uns[key_added + '_colors'] = new_colors # map_colors is not passed elif key + '_colors' in adata.uns: old_colors = adata.uns[key + '_colors'] inverse_map_groups = {g: [] for g in new_categories} for old_group in old_categories: inverse_map_groups[map_groups[old_group]].append(old_group) new_colors = [] for group in new_categories: # take the largest of the old groups old_group = adata.obs[key][adata.obs[key].isin( inverse_map_groups[group])].value_counts().index[0] new_colors.append(old_colors[old_categories.get_loc(old_group)]) adata.uns[key_added + '_colors'] = new_colors def cutoff_small_velocities(adata, vkey='velocity', key_added='velocity_cut', frac_of_max=.5, use_raw=False): x = adata.layers['spliced'] if use_raw else adata.layers['Ms'] y = adata.layers['unspliced'] if use_raw else adata.layers['Mu'] x_max = x.max(0).A[0] if issparse(x) else x.max(0) y_max = y.max(0).A[0] if issparse(y) else y.max(0) xy_norm = x / np.clip(x_max, 1e-3, None) + y / np.clip(y_max, 1e-3, None) W = xy_norm >= np.percentile(xy_norm, 98, axis=0) * frac_of_max adata.layers[key_added] = csr_matrix(W).multiply(adata.layers[vkey]).tocsr() from .velocity_graph import velocity_graph from .velocity_embedding import velocity_embedding velocity_graph(adata, vkey=key_added, approx=True) velocity_embedding(adata, vkey=key_added) def make_unique_list(key, allow_array=False): from pandas import unique, Index if isinstance(key, Index): key = key.tolist() is_list = isinstance(key, (list, tuple, np.record)) if allow_array else isinstance(key, (list, tuple, np.ndarray, np.record)) is_list_of_str = is_list and all(isinstance(item, str) for item in key) return unique(key) if is_list_of_str else key if is_list and len(key) < 20 else [key] PKN''scvelo/tools/velocity.pyfrom .. import settings from .. import logging as logg from ..preprocessing.moments import moments, second_order_moments from .optimization import leastsq_NxN, leastsq_generalized, maximum_likelihood from .utils import R_squared, groups_to_bool, make_dense, strings_to_categoricals import numpy as np import warnings warnings.simplefilter(action='ignore', category=FutureWarning) class Velocity: def __init__(self, adata=None, Ms=None, Mu=None, groups_for_fit=None, groupby=None, residual=None, use_raw=False,): self._adata = adata self._Ms = adata.layers['spliced'] if use_raw else adata.layers['Ms'] if Ms is None else Ms self._Mu = adata.layers['unspliced'] if use_raw else adata.layers['Mu'] if Mu is None else Mu self._Ms, self._Mu = make_dense(self._Ms), make_dense(self._Mu) n_obs, n_vars = self._Ms.shape self._residual, self._residual2 = residual, None self._offset, self._offset2 = np.zeros(n_vars, dtype=np.float32), np.zeros(n_vars, dtype=np.float32) self._gamma, self._r2 = np.zeros(n_vars, dtype=np.float32), np.zeros(n_vars, dtype=np.float32) self._beta, self._velocity_genes = np.ones(n_vars, dtype=np.float32), np.ones(n_vars, dtype=bool) self._groups_for_fit = groups_to_bool(adata, groups_for_fit, groupby) def compute_deterministic(self, fit_offset=False, perc=None): Ms = self._Ms if self._groups_for_fit is None else self._Ms[self._groups_for_fit] Mu = self._Mu if self._groups_for_fit is None else self._Mu[self._groups_for_fit] self._offset, self._gamma = leastsq_NxN(Ms, Mu, fit_offset, perc) self._residual = self._Mu - self._gamma * self._Ms if fit_offset: self._residual -= self._offset self._r2 = R_squared(self._residual, total=self._Mu - self._Mu.mean(0)) self._velocity_genes = (self._r2 > .01) & (self._gamma > .01) def compute_stochastic(self, fit_offset=False, fit_offset2=False, mode=None, perc=None): if self._residual is None: self.compute_deterministic(fit_offset=fit_offset, perc=perc) idx = self._velocity_genes is_subset = True if len(set(idx)) > 1 else False _adata = self._adata[:, idx] if is_subset else self._adata _Ms = self._Ms[:, idx] if is_subset else self._Ms _Mu = self._Mu[:, idx] if is_subset else self._Mu _residual = self._residual[:, idx] if is_subset else self._residual _Mss, _Mus = second_order_moments(_adata) var_ss = 2 * _Mss - _Ms cov_us = 2 * _Mus + _Mu _offset2, _gamma2 = leastsq_NxN(var_ss, cov_us, fit_offset2) # initialize covariance matrix res_std = _residual.std(0) res2_std = (cov_us - _gamma2 * var_ss - _offset2).std(0) # solve multiple regression self._offset[idx], self._offset2[idx], self._gamma[idx] = \ maximum_likelihood(_Ms, _Mu, _Mus, _Mss, fit_offset, fit_offset2) if mode == 'bayes' \ else leastsq_generalized(_Ms, _Mu, var_ss, cov_us, res_std, res2_std, fit_offset, fit_offset2, perc) self._residual = self._Mu - self._gamma * self._Ms if fit_offset: self._residual -= self._offset _residual2 = (cov_us - 2 * _Ms * _Mu) - self._gamma[idx] * (var_ss - 2 * _Ms ** 2) if fit_offset: _residual2 += 2 * self._offset[idx] * _Ms if fit_offset2: _residual2 -= self._offset2[idx] if is_subset: self._residual2 = np.zeros(self._Ms.shape, dtype=np.float32) self._residual2[:, idx] = _residual2 else: self._residual2 = _residual2 # if mode == 'alpha': # Muu = second_order_moments_u(adata) # offset2u, alpha = leastsq_NxN(np.ones(Mu.shape) + 2 * Mu, 2 * Muu - Mu) # pars.extend([offset2u, alpha]) # pars_str.extend(['_offset2u', '_alpha']) def get_pars(self): return self._offset, self._offset2, self._beta, self._gamma, self._r2, self._velocity_genes def get_pars_names(self): return ['_offset', '_offset2', '_beta', '_gamma', '_r2', '_genes'] def write_residuals(adata, vkey, residual=None, cell_subset=None): if residual is not None: if cell_subset is None: adata.layers[vkey] = residual else: if vkey not in adata.layers.keys(): adata.layers[vkey] = np.zeros(adata.shape, dtype=np.float32) adata.layers[vkey][cell_subset] = residual def write_pars(adata, vkey, pars, pars_names, add_key=None): for i, key in enumerate(pars_names): key = vkey + key + '_' + add_key if add_key is not None else vkey + key if len(set(pars[i])) > 1: adata.var[key] = pars[i] elif key in adata.var.keys(): del adata.var[key] def velocity(data, vkey='velocity', mode=None, fit_offset=False, fit_offset2=False, filter_genes=False, groups=None, groupby=None, groups_for_fit=None, use_raw=False, perc=[5, 95], copy=False): """Estimates velocities in a gene-specific manner Arguments --------- data: :class:`~anndata.AnnData` Annotated data matrix. vkey: `str` (default: `'velocity'`) Name under which to refer to the computed velocities for `velocity_graph` and `velocity_embedding`. mode: `'deterministic'`, `'stochastic'` or `'bayes'` (default: `'stochastic'`) Whether to run the estimation using the deterministic or stochastic model of transcriptional dynamics. `'bayes'` solves the stochastic model and accounts for heteroscedasticity, but is slower than `'stochastic'`. fit_offset: `bool` (default: `False`) Whether to fit with offset for first order moment dynamics. fit_offset2: `bool`, (default: `False`) Whether to fit with offset for second order moment dynamics. filter_genes: `bool` (default: `True`) Whether to remove genes that are not used for further velocity analysis. groups: `str`, `list` (default: `None`) Subset of groups, e.g. [‘g1’, ‘g2’, ‘g3’], to which velocity analysis shall be restricted. groupby: `str`, `list` or `np.ndarray` (default: `None`) Key of observations grouping to consider. groups_for_fit: `str`, `list` or `np.ndarray` (default: `None`) Subset of groups, e.g. [‘g1’, ‘g2’, ‘g3’], to which steady-state fitting shall be restricted. use_raw: `bool` (default: `False`) Whether to use raw data for estimation. perc: `float` (default: `None`) Percentile, e.g. 98, upon for extreme quantile fit (to better capture steady states for velocity estimation). copy: `bool` (default: `False`) Return a copy instead of writing to `adata`. Returns ------- Returns or updates `adata` with the attributes velocity: `.layers` velocity vectors for each individual cell variance_velocity: `.layers` velocity vectors for the cell variances velocity_offset, velocity_beta, velocity_gamma, velocity_r2: `.var` parameters """ adata = data.copy() if copy else data if not use_raw and 'Ms' not in adata.layers.keys(): moments(adata) logg.info('computing velocities', r=True) strings_to_categoricals(adata) categories = adata.obs[groupby].cat.categories \ if groupby is not None and groups is None and groups_for_fit is None else [None] for cat in categories: groups = cat if cat is not None else groups cell_subset = groups_to_bool(adata, groups, groupby) _adata = adata if groups is None else adata[cell_subset] velo = Velocity(_adata, groups_for_fit=groups_for_fit, groupby=groupby, use_raw=use_raw) velo.compute_deterministic(fit_offset, perc=perc) if any([mode is not None and mode in item for item in ['stochastic', 'bayes', 'alpha']]): if filter_genes and len(set(velo._velocity_genes)) > 1: adata._inplace_subset_var(velo._velocity_genes) residual = velo._residual[:, velo._velocity_genes] _adata = adata if groups is None else adata[cell_subset] velo = Velocity(_adata, residual=residual, groups_for_fit=groups_for_fit, groupby=groupby) velo.compute_stochastic(fit_offset, fit_offset2, mode, perc=perc) write_residuals(adata, vkey, velo._residual, cell_subset) write_residuals(adata, 'variance_' + vkey, velo._residual2, cell_subset) write_pars(adata, vkey, velo.get_pars(), velo.get_pars_names(), add_key=cat) if filter_genes and len(set(velo._velocity_genes)) > 1: adata._inplace_subset_var(velo._velocity_genes) logg.info(' finished', time=True, end=' ' if settings.verbosity > 2 else '\n') logg.hint('added \n' ' \'' + vkey + '\', velocity vectors for each individual cell (adata.layers)') return adata if copy else None def velocity_genes(data, vkey='velocity', min_r2=0.01, highly_variable=None, copy=False): """Estimates velocities in a gene-specific manner Arguments --------- data: :class:`~anndata.AnnData` Annotated data matrix. vkey: `str` (default: `'velocity'`) Name under which to refer to the computed velocities for `velocity_graph` and `velocity_embedding`. min_r2: `float` (default: 0.01) Minimum threshold for coefficient of determination highly_variable: `bool` (default: `None`) Whether to include highly variable genes only. copy: `bool` (default: `False`) Return a copy instead of writing to `adata`. Returns ------- Updates `adata` attributes velocity_genes: `.var` genes to be used for further velocity analysis (velocity graph and embedding) """ adata = data.copy() if copy else data if vkey + '_genes' not in adata.var.keys(): velocity(data, vkey) adata.var[vkey + '_genes'] = np.array(adata.var[vkey + '_genes'], dtype=bool) & (adata.var[vkey + '_r2'] > min_r2) if highly_variable and 'highly_variable' in adata.var.keys(): adata.var[vkey + '_genes'] &= adata.var['highly_variable'] logg.info('Number of obtained velocity_genes:', np.sum(adata.var[vkey + '_genes'])) return adata if copy else None PKC[NH#scvelo/tools/velocity_confidence.pyfrom .. import logging as logg from ..preprocessing.moments import moments from ..preprocessing.neighbors import neighbors from .utils import prod_sum_var, norm, get_indices from .transition_matrix import transition_matrix import numpy as np def velocity_confidence(data, vkey='velocity', copy=False): """Computes confidences of velocities. Arguments --------- data: :class:`~anndata.AnnData` Annotated data matrix. vkey: `str` (default: `'velocity'`) Name of velocity estimates to be used. copy: `bool` (default: `False`) Return a copy instead of writing to adata. Returns ------- Returns or updates `adata` with the attributes velocity_length: `.obs` Length of the velocity vectors for each individual cell velocity_confidence: `.obs` Confidence for each cell """ adata = data.copy() if copy else data if vkey not in adata.layers.keys(): raise ValueError( 'You need to run `tl.velocity` first.') idx = np.array(adata.var[vkey + '_genes'].values, dtype=bool) X, V = adata.layers['Ms'][:, idx].copy(), adata.layers[vkey][:, idx].copy() indices = get_indices(dist=adata.uns['neighbors']['distances'])[0] V -= V.mean(1)[:, None] V_norm = norm(V) R = np.zeros(adata.n_obs) for i in range(adata.n_obs): Vi_neighs = V[indices[i]] Vi_neighs -= Vi_neighs.mean(1)[:, None] R[i] = np.mean(np.einsum('ij, j', Vi_neighs, V[i]) / (norm(Vi_neighs) * V_norm[i])[None, :]) adata.obs[vkey + '_length'] = V_norm.round(2) adata.obs[vkey + '_confidence'] = R logg.hint('added \'' + vkey + '_confidence\' (adata.obs)') if vkey + '_confidence_transition' not in adata.obs.keys(): velocity_confidence_transition(adata, vkey) return adata if copy else None def velocity_confidence_transition(data, vkey='velocity', scale=10, copy=False): """Computes confidences of velocity transitions. Arguments --------- data: :class:`~anndata.AnnData` Annotated data matrix. vkey: `str` (default: `'velocity'`) Name of velocity estimates to be used. scale: `float` (default: 10) Scale parameter of gaussian kernel. copy: `bool` (default: `False`) Return a copy instead of writing to adata. Returns ------- Returns or updates `adata` with the attributes velocity_confidence_transition: `.obs` Confidence of transition for each cell """ adata = data.copy() if copy else data if vkey not in adata.layers.keys(): raise ValueError( 'You need to run `tl.velocity` first.') idx = np.array(adata.var[vkey + '_genes'].values, dtype=bool) T = transition_matrix(adata, vkey=vkey, scale=scale) dX = T.dot(adata.layers['Ms'][:, idx]) - adata.layers['Ms'][:, idx] dX -= dX.mean(1)[:, None] V = adata.layers[vkey][:, idx].copy() V -= V.mean(1)[:, None] adata.obs[vkey + '_confidence_transition'] = prod_sum_var(dX, V) / (norm(dX) * norm(V)) logg.hint('added \'' + vkey + '_confidence_transition\' (adata.obs)') return adata if copy else None def random_subsample(adata, frac=.5): subset = np.random.choice([True, False], size=adata.n_obs, p=[frac, 1-frac]).sum() adata.obs['subset'] = subset adata_subset = adata[subset].copy() neighbors(adata_subset) moments(adata_subset) return adata_subset def score_robustness(data, adata_subset=None, vkey='velocity', copy=False): adata = data.copy() if copy else data if adata_subset is None: adata_subset = random_subsample(adata) V = adata[adata.obs['subset']].layers[vkey] V_subset = adata_subset.layers[vkey] adata_subset.obs[vkey + '_score_robustness'] = prod_sum_var(V, V_subset) / (norm(V) * norm(V_subset)) return adata_subset if copy else None PK@UNkkVV"scvelo/tools/velocity_embedding.pyfrom .. import settings from .. import logging as logg from .utils import norm from .transition_matrix import transition_matrix from scipy.sparse import issparse import numpy as np import warnings def quiver_autoscale(X_emb, V_emb): import matplotlib.pyplot as pl scale_factor = X_emb.max() # just so that it handles very large values Q = pl.quiver(X_emb[:, 0] / scale_factor, X_emb[:, 1] / scale_factor, V_emb[:, 0], V_emb[:, 1], angles='xy', scale_units='xy', scale=None) Q._init() pl.clf() return Q.scale / scale_factor def velocity_embedding(data, basis=None, vkey='velocity', scale=10, self_transitions=True, use_negative_cosines=True, direct_projection=None, pca_transform=None, retain_scale=False, autoscale=True, all_comps=True, T=None, copy=False): """Computes the single cell velocities in the embedding Arguments --------- data: :class:`~anndata.AnnData` Annotated data matrix. basis: `str` (default: `'tsne'`) Which embedding to use. vkey: `str` (default: `'velocity'`) Name of velocity estimates to be used. scale: `int` (default: 10) Scale parameter of gaussian kernel for transition matrix. self_transitions: `bool` (default: `True`) Whether to allow self transitions, based on the confidences of transitioning to neighboring cell. use_negative_cosines: `bool` (default: `True`) Whether to use not only positive, but also negative cosines and use those transitions to the opposite way. direct_projection: `bool` (default: `True`) Whether to directly project the velocities into PCA space, thus skipping velocity graph. pca_transform: `bool` (default: `None`) same as direct_projection (deprecated) retain_scale: `bool` (default: `False`) Whether to retain scale from high dimensional space in embedding. autoscale: `bool` (default: `True`) Whether to scale the embedded velocities by a scalar multiplier, which simply ensures that the arrows in the embedding are properly scaled. all_comps: `bool` (default: `True`) Whether to compute the velocities on all embedding components or just the first two. T: `csr_matrix` (default: `None`) Allows the user to directly pass a transition matrix. Returns ------- Returns or updates `adata` with the attributes velocity_basis: `.obsm` coordinates of velocity projection on embedding """ adata = data.copy() if copy else data if basis is None: keys = [key for key in ['pca', 'tsne', 'umap'] if 'X_' + key in adata.obsm.keys()] if len(keys) > 0: basis = keys[-1] else: raise ValueError('No basis specified') if 'X_' + basis not in adata.obsm_keys(): raise ValueError('You need compute the embedding first.') logg.info('computing velocity embedding', r=True) if pca_transform is None and direct_projection is None: pca_transform = True if 'pca' in basis else False if 'pca' in basis and (direct_projection or pca_transform): V = adata.layers[vkey] PCs = adata.varm['PCs'] if all_comps else adata.varm['PCs'][:, :2] if vkey + '_genes' in adata.var.keys(): vgenes = np.array(adata.var[vkey + '_genes'], dtype=bool) V = V[:, vgenes] PCs = PCs[vgenes] nans = np.isnan(V.sum(0)) if np.any(nans): V = V[:, ~nans] PCs = PCs[~nans] X_emb = adata.obsm['X_' + basis] V_emb = (V - V.mean(0)).dot(PCs) else: X_emb = adata.obsm['X_' + basis] if all_comps else adata.obsm['X_' + basis][:, :2] V_emb = np.zeros(X_emb.shape) T = transition_matrix(adata, vkey=vkey, scale=scale, self_transitions=self_transitions, use_negative_cosines=use_negative_cosines) if T is None else T T.setdiag(0) T.eliminate_zeros() densify = adata.n_obs < 1e4 TA = T.A if densify else None with warnings.catch_warnings(): warnings.simplefilter("ignore") for i in range(adata.n_obs): indices = T[i].indices dX = X_emb[indices] - X_emb[i, None] # shape (n_neighbors, 2) if not retain_scale: dX /= norm(dX)[:, None] dX[np.isnan(dX)] = 0 # zero diff in a steady-state probs = TA[i, indices] if densify else T[i].data V_emb[i] = probs.dot(dX) - probs.mean() * dX.sum(0) # probs.sum() / len(indices) if retain_scale: delta = T.dot(adata.X) - adata.X if issparse(delta): delta = delta.A cos_proj = (adata.layers[vkey] * delta).sum(1) / norm(delta) V_emb *= np.clip(cos_proj[:, None] * 10, 0, 1) if autoscale: V_emb /= (3 * quiver_autoscale(X_emb, V_emb)) vkey += '_' + basis adata.obsm[vkey] = V_emb logg.info(' finished', time=True, end=' ' if settings.verbosity > 2 else '\n') logg.hint( 'added\n' ' \'' + vkey + '\', embedded velocity vectors (adata.obsm)') return adata if copy else None PKl[Nuzsscvelo/tools/velocity_graph.pyfrom .. import settings from .. import logging as logg from ..preprocessing.neighbors import pca, neighbors from .utils import cosine_correlation, get_indices, get_iterative_indices from .velocity import velocity from scipy.sparse import coo_matrix, csr_matrix, issparse import numpy as np def vals_to_csr(vals, rows, cols, shape, split_negative=False): graph = coo_matrix((vals, (rows, cols)), shape=shape) if split_negative: graph_neg = graph.copy() graph.data = np.clip(graph.data, 0, 1) graph_neg.data = np.clip(graph_neg.data, -1, 0) graph.eliminate_zeros() graph_neg.eliminate_zeros() return graph.tocsr(), graph_neg.tocsr() else: return graph.tocsr() class VelocityGraph: def __init__(self, adata, vkey='velocity', xkey='Ms', tkey=None, basis=None, n_neighbors=None, sqrt_transform=False, n_recurse_neighbors=None, random_neighbors_at_max=None, approx=False, report=False): subset = np.array(adata.var[vkey + '_genes'].values, dtype=bool) \ if vkey + '_genes' in adata.var.keys() else np.ones(adata.n_vars, bool) X = adata.layers[xkey].A[:, subset] if issparse(adata.layers[xkey]) else adata.layers[xkey][:, subset] V = adata.layers[vkey].A[:, subset] if issparse(adata.layers[vkey]) else adata.layers[vkey][:, subset] if approx is True and X.shape[1] > 100: X_pca, PCs, _, _ = pca(X, n_comps=30, svd_solver='arpack', return_info=True) self.X = np.array(X_pca, dtype=np.float32) self.V = (V - V.mean(0)).dot(PCs.T) self.V[V.sum(1) == 0] = 0 else: self.X = np.array(X, dtype=np.float32) self.V = np.array(V, dtype=np.float32) self.sqrt_transform = sqrt_transform if sqrt_transform: self.V = np.sqrt(np.abs(self.V)) * np.sign(self.V) self.V -= self.V.mean(1)[:, None] self.n_recurse_neighbors = 1 if n_neighbors is not None \ else 2 if n_recurse_neighbors is None else n_recurse_neighbors if 'neighbors' not in adata.uns.keys(): neighbors(adata) if n_neighbors is None or n_neighbors < adata.uns['neighbors']['params']['n_neighbors']: self.indices = get_indices(dist=adata.uns['neighbors']['distances'], n_neighbors=n_neighbors)[0] else: if basis is None: basis = [key for key in ['X_pca', 'X_tsne', 'X_umap'] if key in adata.obsm.keys()][-1] elif 'X_' + basis in adata.obsm.keys(): basis = 'X_' + basis if isinstance(approx, str) and approx in adata.obsm.keys(): from sklearn.neighbors import NearestNeighbors neighs = NearestNeighbors(n_neighbors=n_neighbors + 1) neighs.fit(adata.obsm[approx][:, :2]) self.indices = neighs.kneighbors_graph(mode='connectivity').indices.reshape((-1, n_neighbors + 1)) else: from .. import Neighbors neighs = Neighbors(adata) neighs.compute_neighbors(n_neighbors=n_neighbors, use_rep=basis, n_pcs=10) self.indices = get_indices(dist=neighs.distances)[0] self.max_neighs = random_neighbors_at_max self.graph = adata.uns[vkey + '_graph'] if vkey + '_graph' in adata.uns.keys() else [] self.graph_neg = adata.uns[vkey + '_graph_neg'] if vkey + '_graph_neg' in adata.uns.keys() else [] if tkey in adata.obs.keys(): self.t0 = adata.obs[tkey].copy() init = min(self.t0) if isinstance(min(self.t0), int) else 0 self.t0.cat.categories = np.arange(init, len(self.t0.cat.categories)) self.t1 = self.t0.copy() self.t1.cat.categories = self.t0.cat.categories + 1 else: self.t0 = None self.report = report def compute_cosines(self): vals, rows, cols, n_obs = [], [], [], self.X.shape[0] progress = logg.ProgressReporter(n_obs) for i in range(n_obs): neighs_idx = get_iterative_indices(self.indices, i, self.n_recurse_neighbors, self.max_neighs) if self.t0 is not None: t0, t1 = self.t0[i], self.t1[i] if t0 >= 0 and t1 > 0: t1_idx = np.where(self.t0 == t1)[0] if len(t1_idx) > len(neighs_idx): t1_idx = np.random.choice(t1_idx, len(neighs_idx), replace=False) if len(t1_idx) > 0: neighs_idx = np.unique(np.concatenate([neighs_idx, t1_idx])) if self.V[i].max() != 0 or self.V[i].min() != 0: dX = self.X[neighs_idx] - self.X[i, None] # 60% of runtime if self.sqrt_transform: dX = np.sqrt(np.abs(dX)) * np.sign(dX) val = cosine_correlation(dX, self.V[i]) # 40% of runtime else: val = np.zeros(len(neighs_idx)) vals.extend(val) rows.extend(np.ones(len(neighs_idx)) * i) cols.extend(neighs_idx) if self.report: progress.update() if self.report: progress.finish() vals = np.hstack(vals) vals[np.isnan(vals)] = 0 self.graph, self.graph_neg = vals_to_csr(vals, rows, cols, shape=(n_obs, n_obs), split_negative=True) def velocity_graph(data, vkey='velocity', xkey='Ms', tkey=None, basis=None, n_neighbors=None, n_recurse_neighbors=None, random_neighbors_at_max=None, sqrt_transform=False, approx=False, copy=False): """Computes velocity graph based on cosine similarities. The cosine similarities are computed between velocities and potential cell state transitions. Arguments --------- data: :class:`~anndata.AnnData` Annotated data matrix. vkey: `str` (default: `'velocity'`) Name of velocity estimates to be used. n_neighbors: `int` or `None` (default: None) Use fixed number of neighbors or do recursive neighbor search (if `None`). n_recurse_neighbors: `int` (default: 2) Number of recursions to be done for neighbors search. random_neighbors_at_max: `int` or `None` (default: `None`) If number of iterative neighbors for an individual cell is higher than this threshold, a random selection of such are chosen as reference neighbors. sqrt_transform: `bool` (default: `False`) Whether to variance-transform the cell states changes and velocities before computing cosine similarities. copy: `bool` (default: `False`) Return a copy instead of writing to adata. Returns ------- Returns or updates `adata` with the attributes velocity_graph: `.uns` sparse matrix with transition probabilities """ adata = data.copy() if copy else data if vkey not in adata.layers.keys(): velocity(adata, vkey=vkey) vgraph = VelocityGraph(adata, vkey=vkey, xkey=xkey, tkey=tkey, basis=basis, n_neighbors=n_neighbors, approx=approx, n_recurse_neighbors=n_recurse_neighbors, random_neighbors_at_max=random_neighbors_at_max, sqrt_transform=sqrt_transform, report=True) logg.info('computing velocity graph', r=True) vgraph.compute_cosines() adata.uns[vkey+'_graph'] = vgraph.graph adata.uns[vkey+'_graph_neg'] = vgraph.graph_neg logg.info(' finished', time=True, end=' ' if settings.verbosity > 2 else '\n') logg.hint( 'added \n' ' \'' + vkey + '_graph\', sparse matrix with cosine correlations (adata.uns)') return adata if copy else None PK4N +ee#scvelo/tools/velocity_pseudotime.pyimport numpy as np from scipy.sparse import issparse, spdiags, linalg from .utils import groups_to_bool, scale, strings_to_categoricals from ..preprocessing.moments import get_connectivities try: from scanpy.tools.dpt import DPT except ImportError: try: from scanpy.tools._dpt import DPT except ImportError: pass def principal_curve(data, basis='pca', n_comps=4, clusters_list=None, copy=False): """Computes the principal curve Arguments --------- data: :class:`~anndata.AnnData` Annotated data matrix. basis: `str` (default: `'pca'`) Basis to use for computing the principal curve. n_comps: `int` (default: 4) Number of pricipal components to be used. copy: `bool`, (default: `False`) Return a copy instead of writing to adata. Returns ------- Returns or updates `adata` with the attributes principal_curve: `.uns` dictionary containing `projections`, `ixsort` and `arclength` """ adata = data.copy() if copy else data import rpy2.robjects as robjects from rpy2.robjects.packages import importr if clusters_list is not None: cell_subset = np.array([label in clusters_list for label in adata.obs['clusters']]) X_emb = adata[cell_subset].obsm['X_' + basis][:, :n_comps] else: cell_subset = None X_emb = adata.obsm['X_' + basis][:, :n_comps] n_obs, n_dim = X_emb.shape # convert array to R matrix xvec = robjects.FloatVector(X_emb.T.reshape((X_emb.size))) X_R = robjects.r.matrix(xvec, nrow=n_obs, ncol=n_dim) fit = importr("princurve").principal_curve(X_R) adata.uns['principal_curve'] = dict() adata.uns['principal_curve']['ixsort'] = ixsort = np.array(fit[1])-1 adata.uns['principal_curve']['projections'] = np.array(fit[0])[ixsort] adata.uns['principal_curve']['arclength'] = np.array(fit[2]) adata.uns['principal_curve']['cell_subset'] = cell_subset return adata if copy else None def velocity_map(adata=None, T=None, n_dcs=10, return_model=False): vpt = VPT(adata, n_dcs=n_dcs) if T is None: T = adata.uns['velocity_graph'] - adata.uns['velocity_graph_neg'] vpt._connectivities = T + T.T vpt.compute_transitions() vpt.compute_eigen(n_dcs) adata.obsm['X_vmap'] = vpt.eigen_basis return vpt if return_model else None class VPT(DPT): def set_iroot(self, root=None): if isinstance(root, str) and root in self._adata.obs.keys() and self._adata.obs[root].max() != 0: self.iroot = scale(get_connectivities(self._adata).dot(self._adata.obs[root])).argmax() elif isinstance(root, str) and root in self._adata.obs_names: self.iroot = np.where(self._adata.obs_names == root)[0][0] elif isinstance(root, (int, np.integer)) and root < self._adata.n_obs: self.iroot = root else: self.iroot = None def compute_transitions(self, density_normalize=True): T = self._connectivities if density_normalize: q = np.asarray(T.sum(axis=0)) Q = spdiags(1.0 / q, 0, T.shape[0], T.shape[0]) if issparse(T) else np.diag(1.0 / q) K = Q.dot(T).dot(Q) else: K = T z = np.sqrt(np.asarray(K.sum(axis=0))) Z = spdiags(1.0 / z, 0, K.shape[0], K.shape[0]) if issparse(K) else np.diag(1.0 / z) self._transitions_sym = Z.dot(K).dot(Z) def compute_eigen(self, n_comps=10, sym=None, sort='decrease'): if self._transitions_sym is None: raise ValueError('Run `.compute_transitions` first.') n_comps = min(self._transitions_sym.shape[0] - 1, n_comps) evals, evecs = linalg.eigsh(self._transitions_sym, k=n_comps, which='LM') self._eigen_values = evals[::-1] self._eigen_basis = evecs[:, ::-1] def compute_pseudotime(self, inverse=False): if self.iroot is not None: self._set_pseudotime() self.pseudotime = 1 - self.pseudotime if inverse else self.pseudotime self.pseudotime[~np.isfinite(self.pseudotime)] = np.nan else: self.pseudotime = np.empty(self._adata.n_obs) self.pseudotime[:] = np.nan def velocity_pseudotime(adata, groupby=None, groups=None, root=None, end=None, n_dcs=10, n_branchings=0, min_group_size=0.01, allow_kendall_tau_shift=True, use_velocity_field=True, save_diffmap=False, return_model=False): strings_to_categoricals(adata) root = 'root_cells' if root is None and 'root_cells' in adata.obs.keys() else root end = 'end_points' if end is None and 'end_points' in adata.obs.keys() else end groupby = 'cell_fate' if groupby is None and 'cell_fate' in adata.obs.keys() else groupby categories = adata.obs[groupby].cat.categories if groupby is not None and groups is None else [None] for cat in categories: groups = cat if cat is not None else groups cell_subset = groups_to_bool(adata, groups=groups, groupby=groupby) data = adata.copy() if cell_subset is None else adata[cell_subset].copy() vpt = VPT(data, n_dcs=n_dcs, min_group_size=min_group_size, n_branchings=n_branchings, allow_kendall_tau_shift=allow_kendall_tau_shift) if use_velocity_field: T = data.uns['velocity_graph'] - data.uns['velocity_graph_neg'] vpt._connectivities = T + T.T vpt.compute_transitions() vpt.compute_eigen(n_comps=n_dcs) vpt.set_iroot(root) vpt.compute_pseudotime() dpt_root = vpt.pseudotime vpt.set_iroot(end) vpt.compute_pseudotime(inverse=True) dpt_end = vpt.pseudotime # merge dpt_root and inverse dpt_end together vpt.pseudotime = np.nan_to_num(dpt_root) + np.nan_to_num(dpt_end) vpt.pseudotime[np.isfinite(dpt_root) & np.isfinite(dpt_end)] /= 2 vpt.pseudotime = scale(vpt.pseudotime) vpt.pseudotime[np.isnan(dpt_root) & np.isnan(dpt_end)] = np.nan if n_branchings > 0: vpt.branchings_segments() else: vpt.indices = vpt.pseudotime.argsort() if 'velocity_pseudotime' not in adata.obs.keys(): pseudotime = np.empty(adata.n_obs) pseudotime[:] = np.nan else: pseudotime = adata.obs['velocity_pseudotime'].copy() pseudotime[cell_subset] = vpt.pseudotime adata.obs['velocity_pseudotime'] = pseudotime if save_diffmap: diffmap = np.empty(shape=(adata.n_obs, n_dcs)) diffmap[:] = np.nan diffmap[cell_subset] = vpt.eigen_basis adata.obsm['X_diffmap_' + groups] = diffmap return vpt if return_model else None PKwMm9}/scvelo-0.1.17.dist-info/LICENSEBSD 3-Clause License Copyright (c) 2018, Theis Lab All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. * Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. PK!Hd BUcscvelo-0.1.17.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,rzd&Y)r$[)T&UD"PK!HhG% scvelo-0.1.17.dist-info/METADATAZrOʏTfzHіenh2IJD7zvRy}}|}!iǻry8 A?(絭Oų8{++u*|UfÛ8?,{Vt]^̅Ht!FB#޽=tС˾7r XS Ufujc_Euw֕yc'\{."8Y&J>;oƺS[WʭUU%9[~/ݟ6TkOUuQu^Ne?NKdU#Ag'}tЛ[H3Qcl0zIݓʫ"com@<;)waz;s"dK^=Yewy%0Û?g/>B?¨/\mDւ~^j_uluyv՟wg?߾nC&8ӀSQ#T@VӨӧB{!)\ "B.Rt?ѷ!&k兮yeW|@&ڵP5IEg[6+@q`j8xH-ojX27FAokV.5Aʶt$sVwf, KA[39UƩyE*?%f&PȠrYdfSCDzupy-*i8xvtPqs֋cɋ84?/nFn{Wga#O⭑0ڤD "xt)"-PnFTRqil.(O^u[9[ 6%>x.%6DD@|lp>r H @I%V- !b4C#2<@JYw uOIQvDw &ֈ9Lbq|#@G8Aq?2V(d&2ot#:LwC+#BoGE= ѡ (%Ɋd=qBn%\ZGc,{ɝtІޑ CVbicP{]*$UI\pu1قF RuC{v)gNKG؃%- C4! 5OJxGHDR3=Q[0rWRj;Tbcf%E1)aJE vpixue4řU)5)9VDn#e NMm.9%Eg1d\*[*#sBc tP$hXX+位)1AO. 8J/XzrU^U<Jjئ5Q[0WM)8Bi B@ Bq)g a&3L,lO˯,l0mK=(NuӃ6y0dCk=DH Awo/s `^ Cj|f#D%[ I (H_d-%ḮY;6ҍсOʐStْ Q&M|e~AuRqzW\|KeI`[M+;la"EOHd"E H Օ^L&KZa "ar9iUV2LF &uJʲ0#.)#x!CKI6)YBc(IHP{h-ofCϖC ? ]n P tb,u[* e{ nĕ,J.!5 TT Li4rBںz{ȕ*(u9QpI<SsR픀]e#G VE k\{"32H}F90 0FprHD@G6F^CxN , #FIkþ@\gYU[+rKIu|&&D"ٳHfƘc+p?MA98֬A攇ǔ1|6?>yl~10G#wK)Lp.gKZU-+%mtX{[GqQеc&-"XC, DE1l:pPrt=_FzG?kM̮f7KK~u?1(x֋ y{벻8I"@\_B]mF. vH"nn:0>o%*\M}F26I Z8ܰI!# C<)%HUob#BD6y#Z_RKN=OY0=n u[tj&6J3ᮓx.Έ&O4&M]VfY[Qc], ~?"EoN -0h8]pʶG/T?i"NQ T )-6gf0c~MQt#;: /p.5x96ÊG8"cHBI5 4TE9'Vu1SW9;HpЦ/ENaV40DCm[q/[m@a&T D^2&uZ.QCQЕ5ӛ{]Lxe8}ִW<Ѭ-!.-{r&"W{Wn𥷫pGgIe-㉂6v}zu={2OvľڝzH896ai֚-Vӡ!vˤtJ 8`\#i%c g&XZ;W gO9PhtmB  Qp֔ظ׼7Js^>qwz㡻<؈/hާ/LyMb=|O PC rH]r⻡ǎ;wja}#*5 8$ gOJ6fM<~4*EJ+' Qθ۰ڎw~3^ 雾xz\psk V NL=WA+A_~2O? 腔~5:(^ЯoQmRG$~x 1y[KfiS)y-0> kʹ*&2}O^LkjR=!>WOה,i3]U^B금YjʪʪIy[!  Ʋ HJє@NGp̡ EA_?8!I*Uu=8{\p x!K6.P 8dI?mBkb+U7W}W.: N{.l&WN4͛􅤳.Z!!iSd,KU/mt4iBrT G8O@cMi`$pxu/ 5Y̲=!0Laflԛ&Y@#7D_AJrQ/aWhQ{"R)@z* qمF7^qhF'PXo Įn6>r[=ğeBwY9:IIЬ |mіE7Pf=~6F;y*aje VvLzxUVwg< Jh?ejk3/)p$Sխcp`ZQ3j"\h=_q* Z`TkѱAQˌ J)]^c6Ÿ*7{RzQ j9XwصH0B_]PeM^uJe>&-j!/oonJWfmjHW-H!+sH:yF}OQi-ŎW =A090_CBd`Z^F&6WaOV}HSEG}!jOg:d3a+ o}f(rP?wfh񆁜Rr|zF2ȦWr2c%c~!aPtM678ޜ3 kߘ~RI 0~V}ʤ5?>R #Я|i#aeX51ܘvE2A(蛁ø]i4K #/W`͓6J!Tכxk&5sx4"4B]⤃ {nIa:VcB+(mJ1wg=:UkGdKÝ,MYڻDf&Y/vx_Y!gm=0K=H kTL,-ț,(ΦI?v" '-Ƕk\W70-ԯB<2[?c/^Σ6N^M-i_˩*b qylzR څRogήpo6bq^Jv(6i)  &Ji}+L~9xw>4yHƐ=Tr1Y7n|x{?v{0$%$#*Ֆ2{'}ДL{jK! /ړw" }Y>tR^@1WZYYiog|j]^aZ!tJJO}-y<]*1hPIX]/Yfktɴ[m1) oK:|y!/X!g̓ mx\m6mάj9>u7}0[|yI4y2Zm{Џo a՜&2F;nhOU(bp[f@#w0$B>{ѣqDm!)ANϰayx<[&wNےsy 88ͧjBm"fzABԍ.+IU1!ϩyy]Ш|fȰՊ0.3QrjL\t#^VQLFy} !s6e+/튫ؔaon/La- 8dB+s+!B^NWDŽA4Oq{g: ׀bx}‚JԯCud:R P?3I3IawtGRIU $$3 .x`|Rt/PKvnTN;scvelo/__init__.pyPKaeNR-scvelo/datasets.pyPK tNW%scvelo/get_version.pyPK:nTNc2.scvelo/logging.pyPK(MN/ :Cscvelo/pl.pyPK(Md## Cscvelo/pp.pyPK܊>NCCscvelo/read_load.pyPK|bNE$E$[scvelo/settings.pyPK(ME rscvelo/tl.pyPKcyNGh%%scvelo/utils.pyPKCN  scvelo/plotting/__init__.pyPK tNڽ scvelo/plotting/docs.pyPKMk4ND@@scvelo/plotting/heatmap.pyPKTp4N51  3scvelo/plotting/palettes.pyPK Mv77vscvelo/plotting/pseudotime.pyPKwNorH""scvelo/plotting/scatter.pyPK|N'=7 scvelo/plotting/simulation.pyPKN}==scvelo/plotting/utils.pyPKGUfNvݪ??u\scvelo/plotting/velocity.pyPKtDNSЯ%xscvelo/plotting/velocity_embedding.pyPKtDN:-#-#*scvelo/plotting/velocity_embedding_grid.pyPKuDNe,.scvelo/plotting/velocity_embedding_stream.pyPK$k4NlZkP P !scvelo/plotting/velocity_graph.pyPKÀ/N scvelo/preprocessing/__init__.pyPK1N=scvelo/preprocessing/moments.pyPKXN'\!scvelo/preprocessing/neighbors.pyPKyNeU1M1M scvelo/preprocessing/utils.pyPKfvN; Xscvelo/tools/__init__.pyPK}NKMMZscvelo/tools/dynamical_model.pyPK VNd@uu%˨scvelo/tools/dynamical_model_utils.pyPKyqmNg; ; 0scvelo/tools/dynamical_model_utils_deprecated.pyPKsNͿkEKKc?scvelo/tools/optimization.pyPK [NHޠ###Yscvelo/tools/rank_velocity_genes.pyPK>N^u"u"}scvelo/tools/run.pyPK;|YN ##scvelo/tools/terminal_states.pyPKCN !rscvelo/tools/transition_matrix.pyPK4NOWsJ#J#scvelo/tools/utils.pyPKN'' scvelo/tools/velocity.pyPKC[NH#Bscvelo/tools/velocity_confidence.pyPK@UNkkVV"-scvelo/tools/velocity_embedding.pyPKl[Nuzs4Bscvelo/tools/velocity_graph.pyPK4N +ee# `scvelo/tools/velocity_pseudotime.pyPKwMm9}/zscvelo-0.1.17.dist-info/LICENSEPK!Hd BUcՀscvelo-0.1.17.dist-info/WHEELPK!HhG% escvelo-0.1.17.dist-info/METADATAPK!H7-J>scvelo-0.1.17.dist-info/RECORDPK..8 ę