import pysam
import pandas as pd
import numpy as np
from scipy import stats
from scipy.sparse import csr_matrix
import pathlib
import anndata
import subprocess
from concurrent.futures import ProcessPoolExecutor, as_completed
from ..utilities import parse_mc_pattern
from functools import lru_cache
from .._doc import *
[docs]def _read_region_bed(bed_path):
region_bed = pd.read_csv(bed_path, sep="\t", header=None, index_col=3)
region_bed.index.name = "region"
region_bed.columns = ["chrom", "start", "end"]
return region_bed
@lru_cache(99999)
[docs]def bin_sf(cov, mc, p):
if cov > mc:
return stats.binom(cov, p).sf(mc)
else:
# cov == mc, sf = 0
return 0
[docs]def _count_single_allc(
allc_path, bed_path, mc_pattern, output_dir, cutoff=0.9, reverse_value=False
):
patterns = parse_mc_pattern(mc_pattern)
region_bed = _read_region_bed(bed_path)
# bin raw counts
with pysam.TabixFile(allc_path, "r") as allc:
records = [] # list of [mc, cov, region_idx]
for idx, (region_id, (chrom, start, end)) in enumerate(region_bed.iterrows()):
total_mc = 0
total_cov = 0
try:
iterator = allc.fetch(chrom, start, end)
except ValueError:
# in low coverage cells, allc file might miss a whole chromosome,
# this cause value error saying "could not create iterator for region"
continue
for line in iterator:
chrom, pos, _, context, mc, cov, _ = line.split("\t")
if context in patterns:
total_mc += int(mc)
total_cov += int(cov)
if total_cov > 0:
records.append([idx, total_mc, total_cov])
bin_counts = pd.DataFrame(records, columns=["idx", "mc", "cov"]).set_index(
"idx"
)
# calculate binom sf (1-cdf) value, hypo bins are close to 1, hyper bins are close to 0
mc_sum, cov_sum = bin_counts.sum()
p = mc_sum / (cov_sum + 0.000001) # prevent empty allc error
pv = bin_counts.apply(lambda x: bin_sf(x["cov"], x["mc"], p), axis=1).astype(
"float16"
)
if reverse_value:
# use cdf instead of sf when looking for hyper methylation
pv = 1 - pv
pv = pv[pv > cutoff] # get rid of most hyper bins
pv.to_hdf(f"{output_dir}/{pathlib.Path(allc_path).name}.hdf", key="data")
return
@doc_params(
allc_table_doc=allc_table_doc,
bed_path_doc=region_bed_path_mcad_doc,
cpu_doc=cpu_basic_doc,
mc_context_doc=mc_context_mcad_doc,
[docs])
def generate_mcad(
allc_table,
bed_path,
output_prefix,
mc_context,
cpu=1,
cleanup=True,
cutoff=0.9,
reverse_value=False,
):
"""
Parameters
----------
allc_table
{allc_table_doc}
bed_path
{bed_path_doc}
cpu
{cpu_doc}
output_prefix
Output prefix of the MCAD, a suffix ".mcad" will be added.
mc_context
{mc_context_doc}
cleanup
Whether remove temp files or not
cutoff
Values smaller than cutoff will be stored as 0, which reduces the file size
reverse_value
If true, use cdf instead of sf to make hyper-methylation events having higher values
Returns
-------
"""
# validate
if (cutoff < 0) or (cutoff > 1):
raise ValueError(f"Cutoff must between 0 to 1, got {cutoff}.")
# allc table has 2 columns: cell_id \t allc_path
allc_paths = pd.read_csv(
allc_table, sep="\t", index_col=0, header=None, squeeze=True
)
allc_paths.index.name = "cell"
# temp dir
_name = pathlib.Path(output_prefix).name
temp_dir = pathlib.Path(f"{_name}_pv_temp")
temp_dir.mkdir(exist_ok=True)
# calculating individual cells
with ProcessPoolExecutor(cpu) as executor:
futures = {}
allc_path_idy = {}
for idy, (cell_id, allc_path) in enumerate(allc_paths.items()):
allc_path_idy[pathlib.Path(allc_path).name] = idy
output_path = temp_dir / f"{pathlib.Path(allc_path).name}.hdf"
if output_path.exists():
continue
future = executor.submit(
_count_single_allc,
allc_path=allc_path,
bed_path=bed_path,
mc_pattern=mc_context,
output_dir=temp_dir,
cutoff=cutoff,
reverse_value=reverse_value,
)
futures[future] = cell_id
for future in as_completed(futures):
cell_id = futures[future]
print(f"{cell_id} returned.")
future.result()
# aggregate all the cells
print("Aggregate cells into adata")
total_idx = []
total_idy = []
total_data = []
for path in temp_dir.glob("*hdf"):
idy = allc_path_idy[path.name[:-4]]
pv = pd.read_hdf(path)
if pv.size == 0:
# no sig result
continue
total_idx.append(pv.index.values)
total_idy.append([idy] * pv.size)
total_data.append(pv.values)
# the cell by region matrix
region_bed = _read_region_bed(bed_path)
_data = csr_matrix(
(
np.concatenate(total_data),
(np.concatenate(total_idy), np.concatenate(total_idx)),
),
shape=(len(allc_path_idy), region_bed.shape[0]),
)
# save the data as anndata
adata = anndata.AnnData(
_data, obs=pd.DataFrame([], index=allc_paths.index), var=region_bed
)
adata.X = adata.X.astype("float16")
if str(output_prefix)[-5:] in {".mcad", ".h5ad"}:
output_h5ad_path = output_prefix
else:
output_h5ad_path = f"{output_prefix}.mcad"
adata.write_h5ad(pathlib.Path(output_h5ad_path))
# remove temp
if cleanup:
subprocess.run(["rm", "-rf", str(temp_dir)])
return