import pathlib
import subprocess
import warnings
from concurrent.futures import ProcessPoolExecutor, as_completed
from itertools import combinations
from typing import List
import anndata
import numpy as np
import pandas as pd
import scanpy as sc
import xarray as xr
from sklearn.metrics import pairwise_distances
from sklearn.metrics import roc_auc_score
from ..mcds import MCDS
[docs]def _single_pairwise_dmg(
cluster_l,
cluster_r,
top_n,
adj_p_cutoff,
delta_rate_cutoff,
auroc_cutoff,
adata_dir,
dmg_dir,
):
"""Calculate DMG between a pair of adata file"""
output_path = f"{dmg_dir}/{cluster_l}-{cluster_r}.hdf"
if pathlib.Path(output_path).exists():
return
# load data
adata_l = anndata.read_h5ad(f"{adata_dir}/{cluster_l}.h5ad")
adata_r = anndata.read_h5ad(f"{adata_dir}/{cluster_r}.h5ad")
# generate single adata for DMG
adata = adata_l.concatenate(adata_r, batch_key="groups", index_unique=None)
adata.obs = pd.concat([adata_l.obs, adata_r.obs])
adata.obs['groups'] = adata.obs['groups'].astype('category')
try:
assert adata.obs_names.duplicated().sum() == 0
except AssertionError as e:
print(cluster_l, cluster_r)
raise e
# reverse_adata, centered by 1 because after normalization all prior is center to 1
adata.X = (adata.X - 1) * -1 + 1
# calc DMG
with warnings.catch_warnings():
warnings.simplefilter("ignore")
sc.tl.rank_genes_groups(
adata, groupby="groups", n_genes=top_n, method="wilcoxon"
)
dmg_result = pd.DataFrame(
{
data_key: pd.DataFrame(
adata.uns["rank_genes_groups"][data_key], columns=[cluster_r, cluster_l]
).stack()
for data_key in ["names", "pvals_adj"]
}
)
dmg_result.reset_index(drop=True, inplace=True)
# annotate cluster_delta
dmg_result["left-right"] = f"{cluster_l}-{cluster_r}"
l_mean = pd.Series(np.mean(adata_l.X, axis=0), index=adata_l.var_names)
left_mean = dmg_result["names"].map(l_mean)
r_mean = pd.Series(np.mean(adata_r.X, axis=0), index=adata_l.var_names)
right_mean = dmg_result["names"].map(r_mean)
dmg_result["delta"] = left_mean - right_mean
# filter
dmg_result = dmg_result[
(dmg_result["pvals_adj"] < adj_p_cutoff)
& (dmg_result["delta"].abs() > delta_rate_cutoff)
].copy()
dmg_result["hypo_in"] = dmg_result["delta"].apply(
lambda i: cluster_l if i < 0 else cluster_r
)
dmg_result["hyper_in"] = dmg_result["delta"].apply(
lambda i: cluster_r if i < 0 else cluster_l
)
dmg_result = dmg_result.set_index("names").drop_duplicates()
# add AUROC and filter again
auroc = {}
for gene, row in dmg_result.iterrows():
yscore = adata.obs_vector(gene)
ylabel = adata.obs["groups"] == row["hypo_in"]
score = roc_auc_score(ylabel, yscore)
score = abs(score - 0.5) + 0.5
auroc[gene] = score
dmg_result["AUROC"] = pd.Series(auroc)
dmg_result = dmg_result[(dmg_result["AUROC"] > auroc_cutoff)].copy()
# save
dmg_result.to_hdf(output_path, key="data")
return
[docs]class PairwiseDMG:
def __init__(
self,
max_cell_per_group=1000,
top_n=10000,
adj_p_cutoff=0.001,
delta_rate_cutoff=0.3,
auroc_cutoff=0.9,
random_state=0,
n_jobs=1,
verbose=True,
):
"""
Calculate pairwise DMGs. After calculation, results saved in self.dmg_table
Parameters
----------
max_cell_per_group :
maximum number of cells to use for each group, downsample larger groups to this size
top_n :
top N DMGs to report in the final result, if
adj_p_cutoff :
adjusted P value cutoff to report significant DMG
delta_rate_cutoff :
mC fraction delta cutoff to report significant DMG
auroc_cutoff :
AUROC cutoff to report significant DMG
random_state :
overall random state to make sure reproducibility
n_jobs :
number of cpus
"""
self.X = None
self.groups = None
self._obs_dim = ""
self._var_dim = ""
self.dmg_table = None
self._outlier_label = None
self.selected_pairs = None
# parameters
self.max_cell_per_group = max_cell_per_group
self.top_n = top_n
self.adj_p_cutoff = adj_p_cutoff
self.delta_rate_cutoff = delta_rate_cutoff
self.auroc_cutoff = auroc_cutoff
self.random_state = random_state
self.n_jobs = n_jobs
self.verbose = verbose
# internal
self._adata_dir = "_adata_for_dmg"
self._dmg_dir = "_dmg_results"
[docs] def fit_predict(
self,
x,
groups,
var_dim,
obs_dim="cell",
outlier="Outlier",
cleanup=True,
selected_pairs: List[tuple] = None,
):
"""
provide data and perform the pairwise DMG
Parameters
----------
x :
2D cell-by-feature xarray.DataArray
groups :
cluster labels
obs_dim :
name of the cell dim
var_dim :
name of the feature dim
outlier :
name of the outlier group, if provided, will ignore this label
cleanup :
Whether to delete the group adata file
selected_pairs :
By default, pairwise DMG will calculate all possible pairs between all the groups, which might be very
time consuming if the group number is large. With this parameter, you may provide a list of cluster pairs
Returns
-------
"""
if (len(x.shape) != 2) or not isinstance(x, xr.DataArray):
raise ValueError(
"Expect an cell-by-feature 2D xr.DataArray as input matrix."
)
self._obs_dim = obs_dim
self._var_dim = var_dim
self._outlier_label = outlier
self.X = x
use_order = x.get_index(obs_dim)
self.groups = groups.astype("category").reindex(use_order)
self.selected_pairs = selected_pairs
# save adata for each group to dict
if self.verbose:
print("Generating cluster AnnData files")
self._save_cluster_adata()
# run pairwise DMG
if self.verbose:
print("Computing pairwise DMG")
self._pairwise_dmg()
# cleanup
if cleanup:
self._cleanup()
[docs] def _save_cluster_adata(self):
"""Save each group into separate adata, this way reduce the memory during parallel"""
if self.selected_pairs is not None:
use_label = set()
for a, b in self.selected_pairs:
use_label.add(a)
use_label.add(b)
else:
use_label = None
adata_dir = pathlib.Path(self._adata_dir)
adata_dir.mkdir(exist_ok=True)
total_cells = self.X.get_index(self._obs_dim)
for cluster, sub_series in self.groups.groupby(self.groups):
if (use_label is not None) and (cluster not in use_label):
continue
if cluster == self._outlier_label:
# skip outlier
continue
output_path = adata_dir / f"{cluster}.h5ad"
if output_path.exists():
continue
sub_series = sub_series.cat.remove_unused_categories()
if sub_series.size > self.max_cell_per_group:
sub_series = sub_series.sample(
self.max_cell_per_group, random_state=self.random_state
)
cluster_adata = anndata.AnnData(
X=self.X.sel(
{self._obs_dim: total_cells.intersection(sub_series.index)}
).values,
obs=pd.DataFrame({"groups": sub_series.astype("category")}),
var=pd.DataFrame([], index=self.X.get_index(self._var_dim)),
)
cluster_adata.write_h5ad(output_path)
return
[docs] def _pairwise_dmg(self):
"""pairwise DMG runner, result save to self.dmg_table"""
dmg_dir = pathlib.Path(self._dmg_dir)
dmg_dir.mkdir(exist_ok=True)
if self.selected_pairs is None:
pairs = [
i
for i in combinations(sorted(self.groups.unique()), 2)
if self._outlier_label not in i
]
else:
if self.verbose:
print("Using user provided gene pairs")
pairs = self.selected_pairs
if self.verbose:
print(len(pairs), "pairwise DMGs")
n_pairs = len(pairs)
with ProcessPoolExecutor(min(n_pairs, self.n_jobs)) as exe:
step = max(1, n_pairs // 10)
futures = {}
for (cluster_l, cluster_r) in pairs:
f = exe.submit(
_single_pairwise_dmg,
cluster_l=cluster_l,
cluster_r=cluster_r,
top_n=self.top_n,
adj_p_cutoff=self.adj_p_cutoff,
delta_rate_cutoff=self.delta_rate_cutoff,
auroc_cutoff=self.auroc_cutoff,
adata_dir=self._adata_dir,
dmg_dir=self._dmg_dir,
)
futures[f] = (cluster_l, cluster_r)
for i, f in enumerate(as_completed(futures)):
f.result()
if i % step == 0 and self.verbose:
print(f"{i + 1}/{n_pairs} finished")
# summarize
self.dmg_table = pd.concat((pd.read_hdf(p) for p in dmg_dir.glob("*.hdf")))
return
[docs] def _cleanup(self):
"""Delete group adata files"""
subprocess.run(
["rm", "-rf", str(self._adata_dir), str(self._dmg_dir)], check=True
)
[docs] def aggregate_pairwise_dmg(self, adata, groupby, obsm="X_pca"):
"""
Aggregate pairwise DMG results for each cluster, rank DMG for the cluster by the sum of
AUROC * cluster_pair_similarity
This way, the DMGs having large AUROC between similar clusters get more weights
Parameters
----------
adata :
groupby :
obsm :
Returns
-------
"""
# using the cluster centroids in PC space to calculate dendrogram
pc_matrix = adata.obsm[obsm]
pc_center = (
pd.DataFrame(pc_matrix, index=adata.obs_names)
.groupby(adata.obs[groupby])
.median()
)
# calculate cluster pairwise similarity
cluster_dist = pairwise_distances(pc_center)
cluster_dist = pd.DataFrame(
cluster_dist, index=pc_center.index, columns=pc_center.index
)
cluster_dist_norm = cluster_dist / cluster_dist.values.max()
cluster_sim = 1 - cluster_dist_norm
cluster_pair_sim_dict = {
f"{a}-{b}": value for (a, b), value in cluster_sim.unstack().iteritems()
}
self.dmg_table["similarity"] = self.dmg_table["left-right"].map(
cluster_pair_sim_dict
)
# aggregate pairwise DMG to get the cluster level DMG, use the similarity to normalize AUROC
cluster_dmgs = {}
for cluster, sub_df in self.dmg_table.groupby("hypo_in"):
values = sub_df["AUROC"] * sub_df["similarity"]
cluster_dmg_order = (
values.groupby(values.index).sum().sort_values(ascending=False)
)
cluster_dmgs[cluster] = cluster_dmg_order
return cluster_dmgs
[docs]def _single_ovr_dmg(
cell_label,
mcds,
obs_dim,
var_dim,
mc_type,
top_n,
adj_p_cutoff,
fc_cutoff,
auroc_cutoff,
):
"""single one vs rest DMG runner"""
# get adata
cell_judge = mcds.get_index(obs_dim).isin(cell_label.index)
adata = mcds.sel({obs_dim: cell_judge}).get_adata(
mc_type=mc_type, var_dim=var_dim, select_hvf=False
)
adata.var = pd.DataFrame([], index=adata.var_names)
adata.obs["groups"] = cell_label.astype("category")
# calc DMG
with warnings.catch_warnings():
warnings.simplefilter("ignore")
sc.tl.rank_genes_groups(
adata, groupby="groups", n_genes=top_n, method="wilcoxon"
)
dmg_result = pd.DataFrame(
{
data_key: pd.DataFrame(adata.uns["rank_genes_groups"][data_key]).stack()
for data_key in ["names", "pvals_adj"]
}
)
dmg_result = dmg_result[
dmg_result.index.get_level_values(1).astype(bool)
].reset_index(drop=True)
# add fold change
in_cells_mean = adata.X[
adata.obs["groups"].astype(bool),
].mean(axis=0)
out_cells_mean = adata.X[
~adata.obs["groups"].astype(bool),
].mean(axis=0)
fc = pd.Series(in_cells_mean / out_cells_mean, index=adata.var_names)
dmg_result["fc"] = dmg_result["names"].map(fc)
# filter
dmg_result = dmg_result[
(dmg_result["pvals_adj"] < adj_p_cutoff) & (dmg_result["fc"] < fc_cutoff)
].copy()
dmg_result = dmg_result.set_index("names").drop_duplicates()
# add AUROC and filter again
auroc = {}
for gene, row in dmg_result.iterrows():
yscore = adata.obs_vector(gene)
ylabel = adata.obs["groups"] == True
score = roc_auc_score(ylabel, yscore)
score = abs(score - 0.5) + 0.5
auroc[gene] = score
dmg_result["AUROC"] = pd.Series(auroc)
dmg_result = dmg_result[(dmg_result["AUROC"] > auroc_cutoff)].copy()
return dmg_result
[docs]def _one_vs_rest_dmr_runner(
cell_meta,
group,
cluster,
max_cluster_cells,
max_other_fold,
mcds_paths,
obs_dim,
var_dim,
mc_type,
top_n,
adj_p_cutoff,
fc_cutoff,
auroc_cutoff,
verbose=True,
):
"""one vs rest DMG runner"""
if verbose:
print(f"Calculating cluster {cluster} DMGs.")
mcds = MCDS.open(mcds_paths, obs_dim=obs_dim, var_dim=var_dim)
# determine cells to use
cluster_judge = cell_meta[group] == cluster
in_cells = cluster_judge[cluster_judge]
out_cells = cluster_judge[~cluster_judge]
if in_cells.size > max_cluster_cells:
in_cells = in_cells.sample(max_cluster_cells, random_state=0)
max_other_cells = in_cells.size * max_other_fold
if out_cells.size > max_other_cells:
out_cells = out_cells.sample(max_other_cells, random_state=0)
cell_label = pd.concat([in_cells, out_cells])
dmg_df = _single_ovr_dmg(
cell_label=cell_label,
mcds=mcds,
obs_dim=obs_dim,
var_dim=var_dim,
mc_type=mc_type,
top_n=top_n,
adj_p_cutoff=adj_p_cutoff,
fc_cutoff=fc_cutoff,
auroc_cutoff=auroc_cutoff,
)
return dmg_df
[docs]def one_vs_rest_dmg(
cell_meta,
group,
mcds=None,
mcds_paths=None,
obs_dim="cell",
var_dim="gene",
mc_type="CHN",
top_n=1000,
adj_p_cutoff=0.01,
fc_cutoff=0.8,
auroc_cutoff=0.8,
max_cluster_cells=2000,
max_other_fold=5,
cpu=1,
verbose=True,
):
"""
Calculating cluster marker genes using one-vs-rest strategy.
Parameters
----------
cell_meta
cell metadata containing cluster labels
group
the name of the cluster label column
mcds
cell-by-gene MCDS object for calculating DMG. Provide either mcds_paths or mcds.
mcds_paths
cell-by-gene MCDS paths for calculating DMG. Provide either mcds_paths or mcds.
obs_dim
dimension name of the cells
var_dim
dimension name of the features
mc_type
value to select methylation type in the mc_type dimension
top_n
report top N DMGs
adj_p_cutoff
adjusted P value cutoff to report significant DMG
fc_cutoff
mC fraction fold change cutoff to report significant DMG
auroc_cutoff
AUROC cutoff to report significant DMG
max_cluster_cells
The maximum number of cells from a group, downsample large group to this number
max_other_fold
The fold of other cell numbers comparing
cpu :
number of cpus
Returns
-------
dmg_table
pandas Dataframe of the one-vs-rest DMGs
"""
tmp = None
if mcds_paths is not None:
tmp_created = False
elif mcds is not None:
tmp = 'tmp_one_vs_rest.mcds'
mcds.to_zarr(tmp)
mcds_paths = tmp
tmp_created = True
else:
raise ValueError(f'Need to provide either "mcds_path" or "mcds".')
clusters = cell_meta[group].unique()
dmg_table = []
with ProcessPoolExecutor(cpu) as exe:
futures = {}
for cluster in sorted(clusters):
f = exe.submit(
_one_vs_rest_dmr_runner,
cell_meta=cell_meta,
group=group,
cluster=cluster,
max_cluster_cells=max_cluster_cells,
max_other_fold=max_other_fold,
mcds_paths=mcds_paths,
obs_dim=obs_dim,
var_dim=var_dim,
mc_type=mc_type,
top_n=top_n,
adj_p_cutoff=adj_p_cutoff,
fc_cutoff=fc_cutoff,
auroc_cutoff=auroc_cutoff,
)
futures[f] = cluster
for f in as_completed(futures):
cluster = futures[f]
if verbose:
print(f"{cluster} Finished.")
dmg_df = f.result()
dmg_df["cluster"] = cluster
dmg_table.append(dmg_df)
dmg_table = pd.concat(dmg_table)
if tmp_created:
import shutil
shutil.rmtree(tmp)
return dmg_table