from ALLCools.utilities import parse_chrom_size, parse_mc_pattern
import pybedtools
from collections import defaultdict
import pathlib
import pandas as pd
import xarray as xr
import numpy as np
import pysam
import subprocess
from functools import lru_cache
from scipy import stats
from concurrent.futures import ProcessPoolExecutor, as_completed
from shutil import rmtree
from .._doc import *
[docs]ALLOW_QUANT_TYPES = ['count', 'hypo-score', 'hyper-score']
@lru_cache(99999)
[docs]def bin_sf(cov, mc, p):
if cov > mc:
return stats.binom(cov, p).sf(mc)
else:
# cov == mc, sf = 0
return 0
[docs]def cell_sf(cell_count_df):
mc_sum, cov_sum = cell_count_df.sum()
p = mc_sum / (cov_sum + 0.000001) # prevent empty allc error
pv = cell_count_df.apply(lambda x: bin_sf(x["cov"], x["mc"], p), axis=1).astype(
"float16"
)
return pv
[docs]class Quant:
def __init__(self, mc_types, quant_type, kwargs):
self.mc_types = mc_types
self.quant_type = quant_type
self.kwargs = kwargs
[docs]class CountQuantifier:
"""Sum count by mC type for single region in single ALLC"""
def __init__(self, mc_types):
self.mc_types = mc_types
self.mc_data = defaultdict(int)
self.cov_data = defaultdict(int)
return
[docs] def read_line(self, line):
chrom, pos, _, context, mc, cov, *_ = line.split("\t")
self.mc_data[context] += int(mc)
self.cov_data[context] += int(cov)
return
[docs] def summary(self):
mc_type_data = []
for mc_type in self.mc_types:
mc = sum((self.mc_data[k] for k in parse_mc_pattern(mc_type)))
cov = sum((self.cov_data[k] for k in parse_mc_pattern(mc_type)))
mc_type_data.append([mc, cov])
return mc_type_data
[docs]def determine_datasets(regions, quantifiers, chrom_size_path, tmp_dir):
tmp_dir = pathlib.Path(tmp_dir).absolute()
tmp_dir.mkdir(exist_ok=True, parents=True)
chrom_sizes = parse_chrom_size(chrom_size_path)
datasets = {}
for pair in regions:
if len(pair) != 2:
raise ValueError(f'Can not understand {pair} in regions parameter: {regions}')
name, region_path = pair
# prepare regions
if isinstance(region_path, (str, pathlib.Path)) \
and pathlib.Path(region_path).exists():
region_bed_df = pd.read_csv(region_path, sep='\t', header=None)
# remove additional chroms that do not occur in chrom_sizes
region_bed_df = region_bed_df[region_bed_df.iloc[:, 0].isin(
chrom_sizes)]
# sort chroms
region_bed_df = pybedtools.BedTool.from_dataframe(
region_bed_df).sort(g=chrom_size_path).to_dataframe()
if region_bed_df.shape[1] == 3:
# add index
print(
region_path,
'do not have index in its fourth column, adding it automatically. '
'If this is not desired, add a fourth column containing UNIQUE IDs to the BED file.'
)
region_bed_df[name] = (f'{name}_{i}'
for i in range(region_bed_df.shape[0]))
# check if name is unique()
if region_bed_df.iloc[:, 3].duplicated().sum() > 0:
raise ValueError(
f'Region IDs in {region_path} (fourth column) are not unique.'
)
# finally, set ID as index and only take the first three columns
region_bed_df = region_bed_df.iloc[:, [0, 1, 2, 3]].set_index(
region_bed_df.columns[3])
else:
try:
# if region is int, generate chrom bins with bedtools
region_size = int(region_path)
region_bed_df = pybedtools.BedTool().makewindows(
g=chrom_size_path, w=region_size,
s=region_size).to_dataframe()
# set region index
_dfs = []
for chrom, chrom_df in region_bed_df.groupby('chrom'):
chrom_df = chrom_df.reset_index(drop=True)
chrom_df.index = chrom_df.index.map(lambda i: f'{chrom}_{i}')
_dfs.append(chrom_df)
region_bed_df = pd.concat(_dfs)
except ValueError:
raise ValueError(
f'Can not understand region specification {region_path}')
region_path = f'{tmp_dir}/{name}.regions.csv'
region_bed_df.to_csv(region_path)
datasets[name] = {'regions': region_path, 'quant': []}
for quantifier in quantifiers:
if len(quantifier) < 3:
raise ValueError(
f'Quantifier must have three parts, including '
f'["NAME", "QUANT_TYPE", "MC_TYPE", "OTHER_KWARGS"], '
f'where the "OTHER_KWARGS" are optional. '
f'Got {quantifier}')
name, quant_type, mc_types, *other_kwargs = quantifier
if name not in datasets:
raise KeyError(
f'Name {name} occur in quantifiers, but not found in regions.')
kwargs = {}
for kv in other_kwargs:
k, v = kv.split('=')
try:
kwargs[k] = float(v)
except ValueError:
kwargs[k] = v
# prepare mc_types
mc_types = [i.strip() for i in mc_types.split(',')]
# prepare quant_types
if quant_type not in ALLOW_QUANT_TYPES:
raise ValueError(
f'QUANT_TYPE need to be in {ALLOW_QUANT_TYPES}, got {quant_type} in {quantifier}.'
)
datasets[name]['quant'].append(
Quant(mc_types=mc_types, quant_type=quant_type, kwargs=kwargs))
return datasets
[docs]def count_single_region_set(allc_table, region_config, obs_dim, region_dim):
"""Get cell-by-region-by-mc_types count matrix, save to zarr"""
total_mc_types = []
for quant in region_config['quant']:
total_mc_types += quant.mc_types
total_mc_types = list(set(total_mc_types))
total_data = []
for sample, allc_path in allc_table.items():
with pysam.TabixFile(allc_path) as allc:
region_ids = []
sample_data = []
region_chunks = pd.read_csv(region_config['regions'],
index_col=0,
chunksize=1000)
for chunk in region_chunks:
region_ids += chunk.index.tolist()
for region, (chrom, start, end) in chunk.iterrows():
count_quant = CountQuantifier(mc_types=total_mc_types)
try:
allc_lines = allc.fetch(chrom, start, end)
for line in allc_lines:
count_quant.read_line(line)
except ValueError:
# got value error, this chrom not exist in allc
pass
sample_data.append(count_quant.summary())
data = xr.DataArray(
np.array([sample_data]),
coords=[[sample], region_ids, total_mc_types, ['mc', 'cov']],
dims=[obs_dim, region_dim, 'mc_type', 'count_type'])
total_data.append(data)
total_data = xr.Dataset(
{f'{region_dim}_da': xr.concat(total_data, dim=obs_dim)})
return total_data
[docs]def calculate_pv(data, reverse_value, obs_dim, var_dim, cutoff=0.9):
pv = []
for cell in data.get_index(obs_dim):
value = cell_sf(data.sel(cell=cell).to_pandas())
pv.append(value)
pv = np.array(pv)
if reverse_value:
pv = 1 - pv
# get rid of small values, save space and memory
pv[pv < cutoff] = 0
pv = xr.DataArray(pv,
coords=[data.coords[obs_dim], data.coords[var_dim]],
dims=[obs_dim, var_dim])
pv = pv.astype('float16')
return pv
[docs]def count_single_zarr(allc_table,
region_config,
obs_dim,
region_dim,
output_path,
obs_dim_dtype,
count_dtype='uint32'):
"""process single region set and its quantifiers"""
# count all ALLC and mC types that's needed for quantifiers if this region_dim
count_ds = count_single_region_set(allc_table=allc_table,
region_config=region_config,
obs_dim=obs_dim,
region_dim=region_dim)
total_ds = {}
# deal with count quantifiers
count_mc_types = []
for quant in region_config['quant']:
if quant.quant_type == 'count':
count_mc_types += quant.mc_types
count_mc_types = list(set(count_mc_types))
if len(count_mc_types) > 0:
count_da = count_ds.sel(mc_type=count_mc_types)[f'{region_dim}_da']
max_int = np.iinfo(count_dtype).max
count_da = xr.where(count_da > max_int, max_int, count_da)
total_ds[f'{region_dim}_da'] = count_da.astype(count_dtype)
# deal with hypo-score, hyper-score quantifiers
for quant in region_config['quant']:
if quant.quant_type == 'hypo-score':
for mc_type in quant.mc_types:
data = calculate_pv(
data=count_ds.sel(mc_type=mc_type)[f'{region_dim}_da'],
reverse_value=False,
obs_dim=obs_dim,
var_dim=region_dim,
**quant.kwargs)
total_ds[f'{region_dim}_da_{mc_type}-hypo-score'] = data
elif quant.quant_type == 'hyper-score':
for mc_type in quant.mc_types:
data = calculate_pv(
count_ds.sel(mc_type=mc_type)[f'{region_dim}_da'],
reverse_value=True,
obs_dim=obs_dim,
var_dim=region_dim,
**quant.kwargs)
total_ds[f'{region_dim}_da_{mc_type}-hyper-score'] = data
total_ds = xr.Dataset(total_ds)
total_ds.coords[obs_dim] = total_ds.coords[obs_dim].astype(obs_dim_dtype)
total_ds.to_zarr(output_path, mode='w')
return output_path
@doc_params(
generate_dataset_doc=generate_dataset_doc,
allc_table_doc=allc_table_doc,
chrom_size_path_doc=chrom_size_path_doc,
regions_doc=generate_dataset_regions_doc,
quantifiers_doc=generate_dataset_quantifiers_doc,
obs_dim_doc=generate_dataset_obs_dim_doc,
cpu_basic_doc=cpu_basic_doc,
chunk_size_doc=generate_dataset_chunk_size_doc
[docs])
def generate_dataset(allc_table, output_path, regions, quantifiers, chrom_size_path,
obs_dim='cell', cpu=1, chunk_size=None):
"""\
{generate_dataset_doc}
Parameters
----------
allc_table
{allc_table_doc}
output_path
Output path of the MCDS dataset
regions
{regions_doc}
quantifiers
{quantifiers_doc}
chrom_size_path
{chrom_size_path_doc}
obs_dim
{obs_dim_doc}
cpu
{cpu_basic_doc}
chunk_size
{chunk_size_doc}
Returns
-------
output_path
"""
if isinstance(allc_table, (str, pathlib.Path)):
allc_table = pd.read_csv(allc_table,
sep='\t',
header=None,
index_col=0,
squeeze=True)
allc_table.index.name = obs_dim
# determine index length and str dtype
max_length = allc_table.index.map(lambda idx: len(idx)).max()
obs_dim_dtype = f'<U{max_length}'
# determine parallel chunk size
n_sample = allc_table.size
if chunk_size is None:
chunk_size = min(n_sample, 50)
# prepare regions and determine quantifiers
pathlib.Path(output_path).mkdir(exist_ok=True)
tmp_dir = f'{output_path}_tmp'
datasets = determine_datasets(regions, quantifiers, chrom_size_path, tmp_dir)
# copy chrom_size_path to output_path
subprocess.run(['cp', '-f', chrom_size_path,
f'{output_path}/chrom_sizes.txt'],
check=True)
chunk_records = defaultdict(dict)
with ProcessPoolExecutor(cpu) as exe:
futures = {}
# parallel on allc chunks and region_sets levels
for i, chunk_start in enumerate(range(0, n_sample, chunk_size)):
allc_chunk = allc_table[chunk_start:chunk_start + chunk_size]
for region_dim, region_config in datasets.items():
chunk_path = f'{tmp_dir}/chunk_{region_dim}_{chunk_start}.zarr'
f = exe.submit(count_single_zarr,
allc_table=allc_chunk,
region_config=region_config,
obs_dim=obs_dim,
region_dim=region_dim,
output_path=chunk_path,
obs_dim_dtype=obs_dim_dtype)
futures[f] = (region_dim, i)
for f in as_completed(futures):
region_dim, i = futures[f]
chunk_path = f.result()
print(f'Chunk {i} of {region_dim} returned')
chunk_records[region_dim][i] = chunk_path
for region_dim, chunks in chunk_records.items():
# write chunk in order
chunk_paths = pd.Series(chunks).sort_index().tolist()
for i, chunk_path in enumerate(chunk_paths):
ds = xr.open_zarr(chunk_path).load()
# dump chunk to final place
if i == 0:
# first chunk
ds.to_zarr(f'{output_path}/{region_dim}',
mode='w')
else:
# append
ds.to_zarr(f'{output_path}/{region_dim}',
append_dim=obs_dim)
rmtree(chunk_path)
# save region coords to the ds
bed = pd.read_csv(f'{tmp_dir}/{region_dim}.regions.csv', index_col=0)
bed.columns = [f'{region_dim}_chrom', f'{region_dim}_start', f'{region_dim}_end']
bed.index.name = region_dim
# append region bed to the saved ds
ds = xr.Dataset()
for col, data in bed.items():
ds.coords[col] = data
# change object dtype to string
for k in ds.coords.keys():
if ds.coords[k].dtype == 'O':
ds.coords[k] = ds.coords[k].astype(str)
ds.to_zarr(f'{output_path}/{region_dim}', mode='a')
# delete tmp
rmtree(tmp_dir)
from ..mcds.utilities import update_dataset_config
update_dataset_config(output_path,
config={"region_dim": None,
"ds_region_dim": {region_dim: region_dim
for region_dim in datasets.keys()},
"ds_sample_dim": {region_dim: obs_dim
for region_dim in datasets.keys()}})
return output_path