Source code for ALLCools.utilities

import collections
import functools
import gzip
import itertools
import os
import pathlib
import shlex
from collections import defaultdict
from functools import partial
from subprocess import run, PIPE, CalledProcessError
from typing import Union, List

import numpy as np
import pandas as pd

from ._doc import *
from ._open import open_allc

[docs]IUPAC_TABLE = { "A": "A", "T": "T", "C": "C", "G": "G", "R": "AG", "Y": "CT", "S": "GC", "W": "AT", "K": "GT", "M": "AC", "B": "CGT", "D": "AGT", "H": "ATC", "V": "ACG", "N": "ATCGN",
}
[docs]COMPLIMENT_BASE = { "A": "T", "C": "G", "T": "A", "G": "C", "a": "t", "c": "g", "t": "a", "g": "c", "N": "N", "n": "n",
}
[docs]def reverse_complement(seq): return "".join(map(lambda i: COMPLIMENT_BASE[i], seq[::-1]))
[docs]def get_allc_chroms(allc_path): p = run( ["tabix", "-l", allc_path], check=True, stderr=PIPE, stdout=PIPE, encoding="utf8", ) return p.stdout.strip("\n").split("\n")
@functools.lru_cache(maxsize=100)
[docs]def parse_mc_pattern(pattern: str) -> set: """ parse mC context pattern """ # IUPAC DNA abbr. table all_pos_list = [] pattern = pattern.upper() for base in pattern: try: all_pos_list.append(IUPAC_TABLE[base]) except KeyError: raise KeyError(f"Base {base} is not in IUPAC table.") context_set = set(["".join(i) for i in itertools.product(*all_pos_list)]) return context_set
@functools.lru_cache(maxsize=10)
[docs]def parse_chrom_size(path, remove_chr_list=None): """ support simple UCSC chrom size file, or .fai format (1st and 2nd columns same as chrom size file) return chrom:length dict """ if remove_chr_list is None: remove_chr_list = [] with open(path) as f: chrom_dict = collections.OrderedDict() for line in f: # *_ for other format like fadix file chrom, length, *_ = line.strip("\n").split("\t") if chrom in remove_chr_list: continue chrom_dict[chrom] = int(length) return chrom_dict
[docs]def chrom_dict_to_id_index(chrom_dict, bin_size): sum_id = 0 index_dict = {} for chrom, chrom_length in chrom_dict.items(): index_dict[chrom] = sum_id sum_id += chrom_length // bin_size + 1 return index_dict
[docs]def get_bin_id(chrom, chrom_index_dict, bin_start, bin_size) -> int: chrom_index_start = chrom_index_dict[chrom] n_bin = bin_start // bin_size return chrom_index_start + n_bin
[docs]def genome_region_chunks( chrom_size_path: str, bin_length: int = 10000000, combine_small: bool = True ) -> List[str]: """ Split the whole genome into bins, where each bin is {bin_length} bp. Used for tabix region query Parameters ---------- chrom_size_path Path of UCSC genome size file bin_length length of each bin combine_small whether combine small regions into one record Returns ------- list of records in tabix query format """ chrom_size_dict = parse_chrom_size(chrom_size_path) cur_chrom_pos = 0 records = [] record_lengths = [] for chrom, chrom_length in chrom_size_dict.items(): while cur_chrom_pos + bin_length <= chrom_length: # tabix region is 1 based and inclusive records.append(f"{chrom}:{cur_chrom_pos}-{cur_chrom_pos + bin_length - 1}") cur_chrom_pos += bin_length record_lengths.append(bin_length) else: records.append(f"{chrom}:{cur_chrom_pos}-{chrom_length}") cur_chrom_pos = 0 record_lengths.append(chrom_length - cur_chrom_pos) # merge small records (when bin larger then chrom length) final_records = [] if combine_small: temp_records = [] cum_length = 0 for record, record_length in zip(records, record_lengths): temp_records.append(record) cum_length += record_length if cum_length >= bin_length: final_records.append(" ".join(temp_records)) temp_records = [] cum_length = 0 if len(temp_records) != 0: final_records.append(" ".join(temp_records)) else: for record in records: final_records.append(record) return final_records
[docs]def parse_file_paths(input_file_paths: Union[str, list]) -> list: if isinstance(input_file_paths, list) and (len(input_file_paths) == 1): input_file_paths = input_file_paths[0] if isinstance(input_file_paths, str): if "*" in input_file_paths: import glob file_list = glob.glob(input_file_paths) else: file_list = [] with open(input_file_paths) as f: for line in f: file_list.append(line.strip("\n")) _file_list = file_list elif isinstance(input_file_paths, list): _file_list = input_file_paths else: raise TypeError("File paths input is neither str nor list.") final_file_list = [] for path in _file_list: real_path = pathlib.Path(path).resolve() if not real_path.exists(): raise FileNotFoundError(f"{path} provided do not exist.") final_file_list.append(str(real_path)) return _file_list
[docs]def get_md5(file_path): file_md5 = run( shlex.split(f"md5sum {file_path}"), stdout=PIPE, encoding="utf8", check=True ).stdout file_md5 = file_md5.split(" ")[0] return file_md5
[docs]def check_tbi_chroms(file_path, genome_dict, same_order=False): file_tabix_path = pathlib.Path(str(file_path) + ".tbi") if not file_tabix_path.exists(): print(f"{file_path} do not have .tbi index. Use tabix to index it.") return False tbi_time = os.path.getmtime(file_tabix_path) file_time = os.path.getmtime(file_path) if file_time > tbi_time: # tabix file create earlier than ALLC, something may changed for ALLC. return False try: chroms = ( run(["tabix", "-l", file_path], stdout=PIPE, encoding="utf8", check=True) .stdout.strip() .split("\n") ) if len(set(chroms) - genome_dict.keys()) != 0: return False if same_order: ref_order = list(genome_dict.keys()) cur_pos = -1 for chrom in chroms: chrom_pos = ref_order.index(chrom) if chrom_pos < cur_pos: return False else: cur_pos = chrom_pos except CalledProcessError: return False return True
[docs]def generate_chrom_bin_bed_dataframe( chrom_size_path: str, window_size: int, step_size: int = None ) -> pd.DataFrame: """ Generate BED format dataframe based on UCSC chrom size file and window_size return dataframe contain 3 columns: chrom, start, end. The index is 0 based continue bin index. """ if step_size is None: step_size = window_size chrom_size_dict = parse_chrom_size(chrom_size_path) records = [] for chrom, chrom_length in chrom_size_dict.items(): bin_start = np.array(list(range(0, chrom_length, step_size))) bin_end = bin_start + window_size bin_end[np.where(bin_end > chrom_length)] = chrom_length chrom_df = pd.DataFrame(dict(bin_start=bin_start, bin_end=bin_end)) chrom_df["chrom"] = chrom records.append(chrom_df) total_df = pd.concat(records)[["chrom", "bin_start", "bin_end"]].reset_index( drop=True ) return total_df
@doc_params(allc_path_doc=allc_path_doc)
[docs]def profile_allc(allc_path, drop_n=True, n_rows=1000000, output_path=None): """\ Generate some summary statistics of 1 ALLC. 1e8 rows finish in about 5 min. Parameters ---------- allc_path {allc_path_doc} drop_n Whether to drop context that contain N, such as CCN. This is usually very rare and need to be dropped. n_rows Number of rows to calculate the profile from. The default number is usually sufficient to get pretty precise assumption. output_path Path of the output file. If None, will save the profile next to input ALLC file. Returns ------- """ # Find best default value if "gz" in allc_path: opener = partial(gzip.open, mode="rt") else: opener = partial(open, mode="r") # initialize count dict mc_sum_dict = defaultdict(int) cov_sum_dict = defaultdict(int) cov_sum2_dict = defaultdict(int) # sum of square, for calculating variance rate_sum_dict = defaultdict(float) rate_sum2_dict = defaultdict(float) # sum of square, for calculating variance context_count_dict = defaultdict(int) with opener(allc_path) as f: n = 0 for line in f: chrom, pos, strand, context, mc, cov, p = line.split("\t") if drop_n and "N" in context: continue # mc and cov mc_sum_dict[context] += int(mc) cov_sum_dict[context] += int(cov) cov_sum2_dict[context] += int(cov) ** 2 # raw base rate rate = int(mc) / int(cov) rate_sum_dict[context] += rate rate_sum2_dict[context] += rate ** 2 # count context finally context_count_dict[context] += 1 n += 1 if (n_rows is not None) and (n >= n_rows): break # overall count profile_df = pd.DataFrame({"partial_mc": mc_sum_dict, "partial_cov": cov_sum_dict}) profile_df["base_count"] = pd.Series(context_count_dict) profile_df["overall_mc_rate"] = profile_df["partial_mc"] / profile_df["partial_cov"] # cov base mean and base std. # assume that base cov follows normal distribution cov_sum_series = pd.Series(cov_sum_dict) cov_sum2_series = pd.Series(cov_sum2_dict) profile_df["base_cov_mean"] = cov_sum_series / profile_df["base_count"] profile_df["base_cov_std"] = np.sqrt( (cov_sum2_series / profile_df["base_count"]) - profile_df["base_cov_mean"] ** 2 ) # assume that base rate follow beta distribution # so that observed rate actually follow joint distribution of beta (rate) and normal (cov) distribution # here we use the observed base_rate_mean and base_rate_var to calculate # approximate alpha and beta value for the base rate beta distribution rate_sum_series = pd.Series(rate_sum_dict) rate_sum2_series = pd.Series(rate_sum2_dict) profile_df["base_rate_mean"] = rate_sum_series / profile_df["base_count"] profile_df["base_rate_var"] = ( rate_sum2_series / profile_df["base_count"] ) - profile_df["base_rate_mean"] ** 2 # based on beta distribution mean, var # a / (a + b) = base_rate_mean # a * b / ((a + b) ^ 2 * (a + b + 1)) = base_rate_var # we have: a = (1 - profile_df["base_rate_mean"]) * ( profile_df["base_rate_mean"] ** 2 ) / profile_df["base_rate_var"] - profile_df["base_rate_mean"] b = a * (1 / profile_df["base_rate_mean"] - 1) profile_df["base_beta_a"] = a profile_df["base_beta_b"] = b if output_path is not None: profile_df.to_csv(output_path, sep="\t") return None else: return profile_df
[docs]def is_gz_file(filepath): """ Check if a file is gzip file, bgzip also return True Learnt from here: https://stackoverflow.com/questions/3703276/how-to-tell-if-a-file-is-gzip-compressed """ with open(filepath, 'rb') as test_f: return test_f.read(2) == b'\x1f\x8b'
@doc_params(allc_path_doc=allc_path_doc)
[docs]def tabix_allc(allc_path, reindex=False): """ a simple wrapper of tabix command to index 1 ALLC file Parameters ---------- allc_path {allc_path_doc} reindex If True, will force regenerate the ALLC index. Returns ------- """ if not is_gz_file(allc_path): raise ValueError(f'{allc_path} does not appears to be a bgzip compressed file. ' f'Please make sure the ALLC file is compressed by bgzip and ' f'have an tabix index ".tbi" associated with it. If not, you may run bgzip/tabix by yourself ' f'or use "allcools standard" command to check, compress, and tabix your file.') if os.path.exists(f"{allc_path}.tbi") and not reindex: return run(shlex.split(f"tabix -f -b 2 -e 2 -s 1 {allc_path}"), check=True) return
@doc_params( allc_path_doc=allc_path_doc, chrom_size_path_doc=chrom_size_path_doc, compress_level_doc=compress_level_doc, remove_additional_chrom_doc=remove_additional_chrom_doc,
[docs]) def standardize_allc( allc_path, chrom_size_path, compress_level=5, remove_additional_chrom=False ): """\ Standardize 1 ALLC file by checking: 1. No header in the ALLC file; 2. Chromosome names in ALLC must be same as those in the chrom_size_path file, including "chr"; 3. Output file will be bgzipped with .tbi index 4. Remove additional chromosome (remove_additional_chrom=True) or raise KeyError if unknown chromosome found (default) Parameters ---------- allc_path {allc_path_doc} chrom_size_path {chrom_size_path_doc} compress_level {compress_level_doc} remove_additional_chrom {remove_additional_chrom_doc} Returns ------- """ # first check allc tabix and chrom order genome_dict = parse_chrom_size(chrom_size_path) if check_tbi_chroms(allc_path, genome_dict): # tabix exist, younger than allc, chrom order are the same as genome dict # means ALLC is already standard return # TODO # add an option for order or not # try bgzip and tabix first, if success, the file is at least sorted # if failed, need to first sort the file # then after sort, do bgzip and tabix # read allc by chrom, write to final tmp # tabix final tmp, remove everything else if "chr1" in genome_dict: raw_add_chr = True else: raw_add_chr = False with open_allc(allc_path) as f, open_allc( allc_path + ".tmp.gz", mode="w", compresslevel=compress_level ) as wf: cur_chrom = "TOTALLY_NOT_A_CHROM" cur_start = cur_chrom + "\t" cur_pointer = 0 index_lines = [] buffer_lines = "" line_count = 0 add_chr = raw_add_chr for line in f: if line_count == 0: # for very old allc files, which contain header line ll = line.split("\t") try: int(ll[1]) # pos int(ll[4]) # mc int(ll[5]) # cov except ValueError: # The first line is header, remove header continue if line_count < 2: # 1st line could be header that startswith chr # so check 1st and 2nd row if line.startswith("chr"): add_chr = False if add_chr: line = "chr" + line if not line.startswith(cur_start): fields = line.split("\t") cur_chrom = fields[0] if cur_chrom not in genome_dict: if not remove_additional_chrom: raise KeyError( f"{cur_chrom} not exist in genome size file, " f"set remove_additional_chrom=True if want to remove additional chroms" ) else: # skip unknown chromosome continue index_lines.append(cur_chrom + "\t" + str(cur_pointer) + "\n") cur_start = cur_chrom + "\t" cur_pointer += len(line) buffer_lines += line line_count += 1 if line_count % 50000 == 0: wf.write(buffer_lines) buffer_lines = "" wf.write(buffer_lines) run(shlex.split(f"mv {allc_path} {allc_path}.bp"), check=True) run(shlex.split(f"mv {allc_path}.tmp.gz {allc_path}"), check=True) run(shlex.split(f"rm -f {allc_path}.bp"), check=True) tabix_allc(allc_path, reindex=True) return
[docs]def _transfer_bin_size(bin_size: int) -> str: """Get proper str for a large bin_size""" if bin_size > 1000000: bin_size_mode = bin_size % 1000000 bin_size_mode = ( f"{bin_size_mode / 1000000:.1f}"[1:] if bin_size_mode >= 100000 else "" ) bin_size_str = f"{bin_size // 1000000}{bin_size_mode}m" elif bin_size > 1000: bin_size_mode = bin_size % 1000 bin_size_mode = ( f"{bin_size_mode / 1000:.1f}"[1:] if bin_size_mode >= 100 else "" ) bin_size_str = f"{bin_size // 1000}{bin_size_mode}k" else: bin_size_str = f"{bin_size}" return bin_size_str
[docs]def parse_dtype(dtype): if isinstance(dtype, str): if dtype == "uint8": return np.uint16 elif dtype == "uint16": return np.uint16 elif dtype == "uint32": return np.uint32 elif dtype == "uint64": return np.uint64 elif dtype == "int8": return np.int8 elif dtype == "int16": return np.int16 elif dtype == "int32": return np.int32 elif dtype == "int64": return np.int64 elif dtype == "bool": return np.bool else: raise ValueError(f"Unknown dtype {dtype}") else: return dtype
[docs]def binary_count(mc, cov): if mc == 0: return 0, 1 elif mc == cov: return 1, 1 else: return 0, 0