import pathlib
import shlex
import subprocess
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial
from typing import List
import pandas as pd
import pybedtools
from ._doc import *
from ._extract_allc import extract_allc
from ._open import open_gz
from .utilities import (
check_tbi_chroms,
parse_chrom_size,
chrom_dict_to_id_index,
get_bin_id,
_transfer_bin_size,
generate_chrom_bin_bed_dataframe,
)
[docs]def _map_to_sparse_chrom_bin(site_bed, out_bed, chrom_size_path, bin_size=500):
"""
Calculate chromosome bins regional count, output is SPARSE,
bin_id constructed from chrom_size_path and can be reproduce.
"""
chrom_dict = parse_chrom_size(chrom_size_path)
chrom_index_dict = chrom_dict_to_id_index(chrom_dict, bin_size)
cur_chrom = "TOTALLY_NOT_A_CHROM"
cur_chrom_end = 0
bin_end = min(cur_chrom_end, bin_size)
bin_start = 0
bin_id = -1
temp_mc, temp_cov = -1, -1
with open_gz(site_bed) as in_handle, open_gz(out_bed, "w") as out_handle:
# add header to indicate chromosome order
for line in in_handle:
# site-bed format
chrom, pos, _, mc, cov = line.split("\t")
pos = int(pos)
mc = int(mc)
cov = int(cov)
if pos >= bin_end or cur_chrom != chrom:
# write line
if temp_cov > 0:
out_handle.write(
"\t".join(
map(
str,
[
cur_chrom,
bin_start,
bin_end,
bin_id,
temp_mc,
temp_cov,
],
)
)
+ "\n"
)
# reset_chrom
if cur_chrom != chrom:
cur_chrom = chrom
cur_chrom_end = chrom_dict[chrom]
# reset bin
temp_mc, temp_cov = mc, cov
bin_start = int(pos // bin_size * bin_size)
bin_end = min(cur_chrom_end, bin_start + bin_size)
bin_id = get_bin_id(cur_chrom, chrom_index_dict, bin_start, bin_size)
else:
temp_mc += mc
temp_cov += cov
# write last piece
if temp_cov > 0:
out_handle.write(
"\t".join(
map(str, [cur_chrom, bin_start, bin_end, bin_id, temp_mc, temp_cov])
)
+ "\n"
)
return out_bed
@doc_params(
allc_path_doc=allc_path_doc,
chrom_size_path_doc=chrom_size_path_doc,
mc_contexts_doc=mc_contexts_doc,
cov_cutoff_doc=cov_cutoff_doc,
split_strand_doc=split_strand_doc,
cpu_basic_doc=cpu_basic_doc,
bin_sizes_doc=bin_sizes_doc,
region_bed_paths_doc=region_bed_paths_doc,
region_bed_names_doc=region_bed_names_doc,
binarize_doc=binarize_doc,
[docs])
def allc_to_region_count(
allc_path: str,
output_prefix: str,
chrom_size_path: str,
mc_contexts: List[str],
split_strand: bool = False,
region_bed_paths: List[str] = None,
region_bed_names: List[str] = None,
bin_sizes: List[int] = None,
cov_cutoff: int = 9999,
save_zero_cov: bool = False,
remove_tmp: bool = True,
cpu: int = 1,
binarize: bool = False,
):
"""\
Calculate mC and cov at regional level. Region can be provided in 2 forms:
1. BED file, provided by region_bed_paths, containing arbitrary regions and use bedtools map to calculate;
2. Fix-size non-overlap genome bins, provided by bin_sizes.
Form 2 is much faster to calculate than form 1.
The output file is in 6-column bed-like format: chrom start end region_uid mc cov
Parameters
----------
allc_path
{allc_path_doc}
output_prefix
Path prefix of the output region count file.
chrom_size_path
{chrom_size_path_doc}
mc_contexts
{mc_contexts_doc}
split_strand
{split_strand_doc}
region_bed_paths
{region_bed_paths_doc}
region_bed_names
{region_bed_names_doc}
bin_sizes
{bin_sizes_doc}
cov_cutoff
{cov_cutoff_doc}
save_zero_cov
Whether to save the regions that have 0 cov, only apply to region count but not the chromosome count
remove_tmp
Whether to remove the temporary BED file
cpu
{cpu_basic_doc}
This function parallel on region level at the extraction step
and will generate a bunch of small files if cpu > 1.
Do not use cpu > 1 for single cell region count.
For single cell data, parallel on cell level is better.
binarize
{binarize_doc}
Returns
-------
"""
cpu = int(cpu)
genome_dict = parse_chrom_size(chrom_size_path)
if bin_sizes is None and region_bed_paths is None:
raise ValueError("Either bin_sizes or region_bed_paths should be provided.")
# check bed file
# 1. bgzip and tabix
# 2. order of chrom should be the same as order of chrom_size_path
if region_bed_paths is not None:
if (region_bed_names is None) or (
len(region_bed_names) != len(region_bed_paths)
):
raise ValueError("Different number of bed names and paths provided")
for region_name, region_bed_path in zip(region_bed_names, region_bed_paths):
if not check_tbi_chroms(region_bed_path, genome_dict):
raise ValueError(
f"Make sure the bed file {region_bed_path} is:"
f"1. bgzip and tabixed;"
f"2. chromosome order is same as the {chrom_size_path}."
)
# print('Extract ALLC context')
output_prefix = output_prefix.rstrip(".")
strandness = "split" if split_strand else "both"
output_paths_dict = extract_allc(
allc_path=allc_path,
output_prefix=output_prefix,
mc_contexts=mc_contexts,
strandness=strandness,
output_format="bed5",
chrom_size_path=chrom_size_path,
region=None,
cov_cutoff=cov_cutoff,
cpu=cpu,
binarize=binarize,
)
path_dict = {}
for (mc_context, strandness, _), path in output_paths_dict.items():
# this is according to extract_allc return format
info_type = f"{mc_context}-{strandness}"
path_dict[info_type] = path
with ProcessPoolExecutor(cpu) as executor:
futures = []
if region_bed_paths is not None:
# print('Map to regions.')
save_flag = "full" if save_zero_cov else "sparse"
for region_name, region_bed_path in zip(region_bed_names, region_bed_paths):
for info_type, site_bed_path in path_dict.items():
future = executor.submit(
_bedtools_map,
region_bed=region_bed_path,
site_bed=site_bed_path,
out_bed=output_prefix + f".{region_name}"
f"_{info_type}.{save_flag}.bed.gz",
chrom_size_path=chrom_size_path,
save_zero_cov=save_zero_cov,
)
futures.append(future)
if bin_sizes is not None:
# print('Map to chromosome bins.')
for bin_size in bin_sizes:
for info_type, site_bed_path in path_dict.items():
future = executor.submit(
_map_to_sparse_chrom_bin,
site_bed=site_bed_path,
out_bed=output_prefix + f".chrom{_transfer_bin_size(bin_size)}"
f"_{info_type}.sparse.bed.gz",
chrom_size_path=chrom_size_path,
bin_size=bin_size,
)
futures.append(future)
output_collection = []
for future in as_completed(futures):
# call all future.result to make sure finished successfully
output_collection.append(future.result())
if remove_tmp:
# print('Remove temporal files.')
for site_bed_path in path_dict.values():
subprocess.run(["rm", "-f", site_bed_path])
subprocess.run(["rm", "-f", site_bed_path + ".tbi"])
# TODO collect all output path, return a informative dict
return output_collection
[docs]def batch_allc_to_region_count(
allc_series,
output_dir,
chrom_size_path,
mc_contexts,
split_strand,
bin_sizes=None,
region_bed_paths=None,
region_bed_names=None,
cov_cutoff=9999,
cpu=5,
binarize=False,
):
output_dir = pathlib.Path(output_dir).resolve()
output_dir.mkdir(exist_ok=True)
# dump region_bed_path to output_dir for future records
if region_bed_paths is not None:
new_paths = []
for region_bed_name, region_bed_path in zip(region_bed_names, region_bed_paths):
# sort by chrom sizes, bgzip and tabix
bed_df = pd.read_csv(region_bed_path, sep="\t", header=None)
chrom_size = parse_chrom_size(chrom_size_path)
bed_df = bed_df[bed_df.iloc[:, 0].isin(chrom_size.keys())].copy()
bed_sorted_df = (
pybedtools.BedTool.from_dataframe(bed_df)
.sort(g=chrom_size_path)
.to_dataframe()
)
new_path = str(output_dir / f"{region_bed_name}.bed")
bed_sorted_df.to_csv(new_path, sep="\t", index=None, header=None)
subprocess.run(["bgzip", "-f", new_path], check=True)
subprocess.run(["tabix", "-f", f"{new_path}.gz"], check=True)
new_path = f"{new_path}.gz"
new_paths.append(new_path)
# save hdf
bed_df = pd.read_csv(new_path, header=None, index_col=3, sep="\t")
bed_df.columns = ["chrom", "start", "end"]
bed_df.index.name = region_bed_name
bed_df["int_id"] = list(range(0, bed_df.shape[0]))
bed_df["int_id"].to_hdf(
output_dir / f"REGION_ID_{region_bed_name}.hdf", key="data"
)
bed_df.iloc[:, :3].to_hdf(
output_dir / f"REGION_BED_{region_bed_name}.hdf", key="data"
)
region_bed_paths = new_paths
if bin_sizes is not None:
for bin_size in bin_sizes:
bin_size_chr = _transfer_bin_size(bin_size)
region_name = f"chrom{bin_size_chr}"
bed_df = generate_chrom_bin_bed_dataframe(
chrom_size_path=chrom_size_path,
window_size=bin_size,
step_size=bin_size,
)
bed_df["int_id"] = list(range(0, bed_df.shape[0]))
bed_df["int_id"].to_hdf(
output_dir / f"REGION_ID_{region_name}.hdf", key="data"
)
bed_df.iloc[:, :3].to_hdf(
output_dir / f"REGION_BED_chrom{bin_size_chr}.hdf", key="data"
)
with ProcessPoolExecutor(cpu) as executor:
futures = {}
for cell_id, allc_path in allc_series.iteritems():
future = executor.submit(
allc_to_region_count,
allc_path=allc_path,
output_prefix=str(output_dir / cell_id),
chrom_size_path=chrom_size_path,
mc_contexts=mc_contexts,
split_strand=split_strand,
region_bed_paths=region_bed_paths,
region_bed_names=region_bed_names,
bin_sizes=bin_sizes,
cov_cutoff=cov_cutoff,
save_zero_cov=False,
remove_tmp=True,
cpu=1,
binarize=binarize,
)
futures[future] = cell_id
records = {}
finish_count = 0
for future in as_completed(futures):
cell_id = futures[future]
try:
output_path_collect = future.result()
records[cell_id] = output_path_collect
except Exception as e:
print(f"{cell_id} raised an error!")
raise e
finish_count += 1
if finish_count % 100 == 0:
print(f"{finish_count} allc-to-region-count finished.")
path_records = []
for file_id, out_paths in records.items():
for path in out_paths:
file_name = pathlib.Path(path).name
*_, region_mc_type_strand, _, _, _ = file_name.split(".")
region_name, mc_type, strandness = region_mc_type_strand.replace(
"_", "-"
).split("-")
path_dict = {
"region_name": region_name,
"mc_type": mc_type,
"strandness": strandness,
"file_id": file_id,
"file_path": path,
}
path_records.append(path_dict)
path_df = pd.DataFrame(path_records)
path_df.to_hdf(output_dir / "REGION_COUNT_SUMMARY.hdf", key="data")
return