Source code for ALLCools._extract_allc

from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, as_completed
from subprocess import run
from typing import Union, Tuple, Callable

import pandas as pd

from ._doc import *
from ._open import open_allc, open_gz
from .utilities import (
    tabix_allc,
    parse_mc_pattern,
    parse_chrom_size,
    genome_region_chunks,
    binary_count,
)


[docs]def _merge_cg_strand(in_path, out_path): """ Merge strand after extract context step in extract_allc (and only apply on CG), so no need to check context. """ prev_line = None cur_chrom = None with open_allc(in_path) as allc, open_allc(out_path, "w") as out_allc: for line in allc: cur_line = line.strip("\n").split("\t") if cur_line[0] != cur_chrom: if prev_line is not None: out_allc.write("\t".join(prev_line) + "\n") prev_line = cur_line cur_chrom = cur_line[0] continue if prev_line is None: prev_line = cur_line continue else: # pos should be continuous, strand should be reverse if ( int(prev_line[1]) + 1 == int(cur_line[1]) and prev_line[2] != cur_line[2] ): new_line = prev_line[:4] + [ str(int(prev_line[4]) + int(cur_line[4])), str(int(prev_line[5]) + int(cur_line[5])), "1", ] prev_line = None # otherwise, only write and update prev_line else: if prev_line[2] == "-": # change all the '-' strand to '+' strand new_line = [ prev_line[0], str(int(prev_line[1]) - 1), "+", ] + prev_line[3:] else: new_line = prev_line.copy() prev_line = cur_line out_allc.write("\t".join(new_line) + "\n") return
[docs]def _check_strandness_parameter(strandness) -> str: strandness = str(strandness).lower() if strandness in {"both", "b"}: return "Both" elif strandness in {"merge", "mergetmp", "m"}: # first getting both, deal with strand merge later return "MergeTmp" elif strandness in {"split", "s"}: return "Split" else: raise ValueError(f"Unknown value for strandness: {strandness}")
[docs]def _check_out_format_parameter( out_format, binarize=False ) -> Tuple[str, Callable[[list], str]]: if binarize: def _extract_allc_format(allc_line_list): # keep allc format # mc and cov is binarized allc_line_list[4], allc_line_list[5] = binary_count( int(allc_line_list[4]), int(allc_line_list[5]) ) return "\t".join(map(str, allc_line_list)) def _extract_bed5_format(allc_line_list): # only chrom, pos, pos, mc, cov # mc and cov is binarized allc_line_list[4], allc_line_list[5] = binary_count( int(allc_line_list[4]), int(allc_line_list[5]) ) allc_line_list = [allc_line_list[i] for i in [0, 1, 1, 4, 5]] return "\t".join(map(str, allc_line_list)) + "\n" else: def _extract_allc_format(allc_line_list): # keep allc format return "\t".join(allc_line_list) def _extract_bed5_format(allc_line_list): # only chrom, pos, pos, mc, cov allc_line_list = [allc_line_list[i] for i in [0, 1, 1, 4, 5]] return "\t".join(allc_line_list) + "\n" out_format = str(out_format).lower() if out_format == "allc": return "allc.tsv.gz", _extract_allc_format elif out_format == "bed5": return "bed5.bed.gz", _extract_bed5_format else: raise ValueError(f"Unknown value for out_format: {out_format}")
[docs]def _merge_gz_files(file_list, output_path): """ Merge the small chunk files generated by _extract_allc_parallel, remove the small files after merge """ with open(output_path + "tmp", "w") as out_f: for file_path in file_list: with open_gz(file_path, "rt") as f: out_f.write(f.read()) run(["rm", "-f", file_path]) run(["bgzip", output_path + "tmp"], check=True) run(["mv", "-f", output_path + "tmp.gz", output_path], check=True) return output_path
[docs]def _extract_allc_parallel( allc_path, output_prefix, mc_contexts, strandness, output_format, chrom_size_path, cov_cutoff, cpu, chunk_size=100000000, tabix=True, ): """ Parallel extract_allc on region level Then parallel merge region chunk files to the final output in order Same input output as extract_allc, but will generate a bunch of small files during running Don't use this on small files """ output_prefix = output_prefix.rstrip(".") regions = genome_region_chunks( chrom_size_path=chrom_size_path, bin_length=chunk_size, combine_small=True ) future_dict = {} with ProcessPoolExecutor(cpu) as executor: for chunk_id, region in enumerate(regions): future = executor.submit( extract_allc, allc_path=allc_path, output_prefix=output_prefix + f".{chunk_id}.", mc_contexts=mc_contexts, strandness=strandness, output_format=output_format, chrom_size_path=chrom_size_path, region=region, cov_cutoff=cov_cutoff, cpu=1, tabix=False, ) future_dict[future] = chunk_id output_records = {} for future in as_completed(future_dict): output_path_dict = future.result() chunk_id = future_dict[future] output_records[chunk_id] = output_path_dict # agg chunk_output records = [] for chunk_id, paths_dict in output_records.items(): for (mc_context, strandness, out_suffix), path in paths_dict.items(): records.append([path, chunk_id, mc_context, strandness, out_suffix]) total_output_df = pd.DataFrame( records, columns=["path", "chunk_id", "mc_context", "strandness", "out_suffix"], ) real_out_paths_dict = {} need_tabix = [] with ProcessPoolExecutor(cpu) as merge_executor: futures = [] for (mc_context, strandness, out_suffix), sub_df in total_output_df.groupby( ["mc_context", "strandness", "out_suffix"] ): ordered_index = sub_df["chunk_id"].astype(int).sort_values().index ordered_file_list = sub_df.loc[ordered_index, "path"].tolist() real_file_path = f"{output_prefix}.{mc_context}-{strandness}.{out_suffix}" real_out_paths_dict[(mc_context, strandness, out_suffix)] = real_file_path if tabix and "allc" in out_suffix: need_tabix.append(real_file_path) future = merge_executor.submit( _merge_gz_files, file_list=ordered_file_list, output_path=real_file_path ) futures.append(future) for future in as_completed(futures): future.result() if tabix: with ProcessPoolExecutor(cpu) as tabix_executor: futures = [] for path in need_tabix: future = tabix_executor.submit(tabix_allc, path) futures.append(future) for future in as_completed(futures): future.result() return real_out_paths_dict
@doc_params( allc_path_doc=allc_path_doc, mc_contexts_doc=mc_contexts_doc, cov_cutoff_doc=cov_cutoff_doc, chrom_size_path_doc=chrom_size_path_doc, strandness_doc=strandness_doc, region_doc=region_doc, cpu_basic_doc=cpu_basic_doc, binarize_doc=binarize_doc,
[docs]) def extract_allc( allc_path: str, output_prefix: str, mc_contexts: Union[str, list], chrom_size_path: str, strandness: str = "both", output_format: str = "allc", region: str = None, cov_cutoff: int = 9999, tabix: bool = True, cpu=1, binarize=False, ): """\ Extract information (strand, context) from 1 ALLC file. Save to several formats. Parameters ---------- allc_path {allc_path_doc} output_prefix Path prefix of the output ALLC file. mc_contexts {mc_contexts_doc} strandness {strandness_doc} output_format Output format of extracted information, possible values are: 1. allc: keep the allc format 2. bed5: 5-column bed format, chrom, pos, pos, mc, cov chrom_size_path {chrom_size_path_doc} If chrom_size_path provided, will use it to extract ALLC with chrom order, but if region provided, will ignore this. region {region_doc} cov_cutoff {cov_cutoff_doc} tabix Whether to generate tabix if format is ALLC, only set this to False from _extract_allc_parallel cpu {cpu_basic_doc} This function parallel on region level and will generate a bunch of small files if cpu > 1. Do not use cpu > 1 for single cell region count. For single cell data, parallel on cell level is better. binarize {binarize_doc} Returns ------- A list of output file paths, not include index files. """ parallel_chunk_size = 100000000 # determine region and parallel parallel = False if region is None: if chrom_size_path is not None: chrom_dict = parse_chrom_size(chrom_size_path) region = " ".join(chrom_dict.keys()) if cpu > 1: parallel = True # prepare params output_prefix = output_prefix.rstrip(".") if isinstance(mc_contexts, str): mc_contexts = mc_contexts.split(" ") mc_contexts = list(set(mc_contexts)) strandness = _check_strandness_parameter(strandness) out_suffix, line_func = _check_out_format_parameter( output_format, binarize=binarize ) # because mc_contexts can overlap (e.g. CHN, CAN) # each context may associate to multiple handle context_handle = defaultdict(list) handle_collect = [] output_path_collect = {} for mc_context in mc_contexts: parsed_context_set = parse_mc_pattern(mc_context) if strandness == "Split": file_path = output_prefix + f".{mc_context}-Watson.{out_suffix}" output_path_collect[(mc_context, "Watson", out_suffix)] = file_path w_handle = open_allc(file_path, "w") handle_collect.append(w_handle) file_path = output_prefix + f".{mc_context}-Crick.{out_suffix}" output_path_collect[(mc_context, "Crick", out_suffix)] = file_path c_handle = open_allc(file_path, "w") handle_collect.append(c_handle) for mc_pattern in parsed_context_set: # handle for Watson/+ strand context_handle[(mc_pattern, "+")].append(w_handle) # handle for Crick/- strand context_handle[(mc_pattern, "-")].append(c_handle) else: # handle for both strand file_path = output_prefix + f".{mc_context}-{strandness}.{out_suffix}" if strandness == "MergeTmp": output_path_collect[(mc_context, "Merge", out_suffix)] = ( output_prefix + f".{mc_context}-Merge.{out_suffix}" ) else: output_path_collect[(mc_context, strandness, out_suffix)] = file_path _handle = open_allc(file_path, "w") handle_collect.append(_handle) for mc_pattern in parsed_context_set: context_handle[mc_pattern].append(_handle) # determine parallel or not cpu = int(cpu) if parallel: print("Parallel extract ALLC") if strandness == "MergeTmp": strandness = "Merge" return _extract_allc_parallel( allc_path=allc_path, output_prefix=output_prefix, mc_contexts=mc_contexts, strandness=strandness, output_format=output_format, chrom_size_path=chrom_size_path, cov_cutoff=cov_cutoff, cpu=cpu, chunk_size=parallel_chunk_size, tabix=tabix, ) # split file first # strandness function with open_allc(allc_path, region=region) as allc: if strandness == "Split": for line in allc: cur_line = line.split("\t") if int(cur_line[5]) > cov_cutoff: continue try: # key is (context, strand) [ h.write(line_func(cur_line)) for h in context_handle[(cur_line[3], cur_line[2])] ] except KeyError: continue else: for line in allc: cur_line = line.split("\t") if int(cur_line[5]) > cov_cutoff: continue try: # key is context [h.write(line_func(cur_line)) for h in context_handle[cur_line[3]]] except KeyError: continue for handle in handle_collect: handle.close() for mc_context in mc_contexts: # tabix ALLC file if strandness == "Split": in_path = output_prefix + f".{mc_context}-Watson.{out_suffix}" if tabix: tabix_allc(in_path) in_path = output_prefix + f".{mc_context}-Crick.{out_suffix}" if tabix: tabix_allc(in_path) elif strandness == "MergeTmp": in_path = output_prefix + f".{mc_context}-{strandness}.{out_suffix}" if ("CG" in mc_context) and ("allc" in out_suffix): out_path = output_prefix + f".{mc_context}-Merge.{out_suffix}" _merge_cg_strand(in_path, out_path) run(["rm", "-f", in_path], check=True) else: # for non-CG, there is no need to merge strand out_path = output_prefix + f".{mc_context}-Both.{out_suffix}" run(["mv", in_path, out_path], check=True) if tabix: tabix_allc(out_path) else: in_path = output_prefix + f".{mc_context}-{strandness}.{out_suffix}" if tabix: tabix_allc(in_path) # return a dict, key is (mc_context, strandness, out_suffix), value is file path return output_path_collect