Source code for ALLCools._bam_to_allc

"""
This file is modified from methylpy https://github.com/yupenghe/methylpy

Author: Yupeng He
"""

import logging
import pathlib
import shlex
import subprocess
from concurrent.futures import ProcessPoolExecutor, as_completed

import collections
import pandas as pd

from ._doc import *
from ._open import open_allc, open_bam
from .utilities import genome_region_chunks

# logger
[docs]log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())
[docs]def _read_faidx(faidx_path): """ Read fadix of reference fasta file samtools fadix ref.fa """ return pd.read_csv( faidx_path, index_col=0, header=None, sep="\t", names=["NAME", "LENGTH", "OFFSET", "LINEBASES", "LINEWIDTH"],
)
[docs]def _get_chromosome_sequence_upper(fasta_path, fai_df, query_chrom): """ read a whole chromosome sequence into memory """ chrom_pointer = fai_df.loc[query_chrom, "OFFSET"] tail = fai_df.loc[query_chrom, "LINEBASES"] - fai_df.loc[query_chrom, "LINEWIDTH"] seq = "" with open(fasta_path, "r") as f: f.seek(chrom_pointer) for line in f: if line[0] == ">": break seq += line[:tail] # trim \n return seq.upper()
[docs]def _get_bam_chrom_index(bam_path): result = subprocess.run( ["samtools", "idxstats", bam_path], stdout=subprocess.PIPE, encoding="utf8" ).stdout chrom_set = set() for line in result.split("\n"): chrom = line.split("\t")[0] if chrom not in ["", "*"]: chrom_set.add(chrom) return pd.Index(chrom_set)
[docs]def _bam_to_allc_worker( bam_path, reference_fasta, fai_df, output_path, region=None, num_upstr_bases=0, num_downstr_bases=2, buffer_line_number=100000, min_mapq=0, min_base_quality=1, compress_level=5, tabix=True, save_count_df=False, ): """ None parallel bam_to_allc worker function, call by bam_to_allc """ # mpileup if region is None: mpileup_cmd = ( f"samtools mpileup -Q {min_base_quality} " f"-q {min_mapq} -B -f {reference_fasta} {bam_path}" ) pipes = subprocess.Popen( shlex.split(mpileup_cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) else: bam_handle = open_bam( bam_path, region=region, mode="r", include_header=True, samtools_parms_str=None, ) mpileup_cmd = ( f"samtools mpileup -Q {min_base_quality} " f"-q {min_mapq} -B -f {reference_fasta} -" ) pipes = subprocess.Popen( shlex.split(mpileup_cmd), stdin=bam_handle.file, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) result_handle = pipes.stdout output_file_handler = open_allc(output_path, mode="w", compresslevel=compress_level) # initialize variables complement = {"A": "T", "T": "A", "C": "G", "G": "C", "N": "N"} mc_sites = {"C", "G"} context_len = num_upstr_bases + 1 + num_downstr_bases cur_chrom = "" line_counts = 0 total_line = 0 out = "" seq = None # whole cur_chrom seq chr_out_pos_list = [] cur_out_pos = 0 cov_dict = collections.defaultdict(int) # context: cov_total mc_dict = collections.defaultdict(int) # context: mc_total # process mpileup result for line in result_handle: total_line += 1 fields = line.split("\t") fields[2] = fields[2].upper() # if chrom changed, read whole chrom seq from fasta if fields[0] != cur_chrom: cur_chrom = fields[0] chr_out_pos_list.append((cur_chrom, str(cur_out_pos))) # get seq for cur_chrom seq = _get_chromosome_sequence_upper(reference_fasta, fai_df, cur_chrom) if fields[2] not in mc_sites: continue # indels read_bases = fields[4] incons_basecalls = read_bases.count("+") + read_bases.count("-") if incons_basecalls > 0: read_bases_no_indel = "" index = 0 prev_index = 0 while index < len(read_bases): if read_bases[index] == "+" or read_bases[index] == "-": # get insert size indel_size = "" ind = index + 1 while True: try: int(read_bases[ind]) indel_size += read_bases[ind] ind += 1 except: break try: # sometimes +/- does not follow by a number and # it should be ignored indel_size = int(indel_size) except: index += 1 continue read_bases_no_indel += read_bases[prev_index:index] index = ind + indel_size prev_index = index else: index += 1 read_bases_no_indel += read_bases[prev_index:index] fields[4] = read_bases_no_indel # count converted and unconverted bases if fields[2] == "C": pos = int(fields[1]) - 1 try: context = seq[(pos - num_upstr_bases) : (pos + num_downstr_bases + 1)] except: # complete context is not available, skip continue unconverted_c = fields[4].count(".") converted_c = fields[4].count("T") cov = unconverted_c + converted_c if cov > 0 and len(context) == context_len: line_counts += 1 data = ( "\t".join( [ cur_chrom, str(pos + 1), "+", context, str(unconverted_c), str(cov), "1", ] ) + "\n" ) cov_dict[context] += cov mc_dict[context] += unconverted_c out += data cur_out_pos += len(data) elif fields[2] == "G": pos = int(fields[1]) - 1 try: context = "".join( [ complement[base] for base in reversed( seq[(pos - num_downstr_bases) : (pos + num_upstr_bases + 1)] ) ] ) except: # complete context is not available, skip continue unconverted_c = fields[4].count(",") converted_c = fields[4].count("a") cov = unconverted_c + converted_c if cov > 0 and len(context) == context_len: line_counts += 1 data = ( "\t".join( [ cur_chrom, str(pos + 1), "-", context, str(unconverted_c), str(cov), "1", ] ) + "\n" ) cov_dict[context] += cov mc_dict[context] += unconverted_c out += data cur_out_pos += len(data) if line_counts > buffer_line_number: output_file_handler.write(out) line_counts = 0 out = "" if line_counts > 0: output_file_handler.write(out) result_handle.close() output_file_handler.close() if tabix: subprocess.run(shlex.split(f"tabix -b 2 -e 2 -s 1 {output_path}"), check=True) count_df = pd.DataFrame({"mc": mc_dict, "cov": cov_dict}) count_df["mc_rate"] = count_df["mc"] / count_df["cov"] total_genome_length = fai_df["LENGTH"].sum() count_df["genome_cov"] = total_line / total_genome_length if save_count_df: count_df.to_csv(output_path + ".count.csv") return None else: return count_df
[docs]def _aggregate_count_df(count_dfs): total_df = pd.concat(count_dfs) total_df = total_df.groupby(total_df.index).sum() total_df["mc_rate"] = total_df["mc"] / total_df["cov"] total_df["mc"] = total_df["mc"].astype(int) total_df["cov"] = total_df["cov"].astype(int) return total_df
@doc_params( compress_level_doc=compress_level_doc, cpu_basic_doc=cpu_basic_doc, reference_fasta_doc=reference_fasta_doc,
[docs]) def bam_to_allc( bam_path, reference_fasta, output_path=None, cpu=1, num_upstr_bases=0, num_downstr_bases=2, min_mapq=10, min_base_quality=20, compress_level=5, save_count_df=False, ): """\ Generate 1 ALLC file from 1 position sorted BAM file via samtools mpileup. Parameters ---------- bam_path Path to 1 position sorted BAM file reference_fasta {reference_fasta_doc} output_path Path to 1 output ALLC file cpu {cpu_basic_doc} DO NOT use cpu > 1 for single cell ALLC generation. Parallel on cell level is better for single cell project. num_upstr_bases Number of upstream base(s) of the C base to include in ALLC context column, usually use 0 for normal BS-seq, 1 for NOMe-seq. num_downstr_bases Number of downstream base(s) of the C base to include in ALLC context column, usually use 2 for both BS-seq and NOMe-seq. min_mapq Minimum MAPQ for a read being considered, samtools mpileup parameter, see samtools documentation. min_base_quality Minimum base quality for a base being considered, samtools mpileup parameter, see samtools documentation. compress_level {compress_level_doc} save_count_df If true, save an ALLC context count table next to ALLC file. Returns ------- count_df a pandas.DataFrame for overall mC and cov count separated by mC context. """ buffer_line_number = 100000 tabix = True # Check fasta index if not pathlib.Path(reference_fasta + ".fai").exists(): raise FileNotFoundError( "Reference fasta not indexed. Use samtools faidx to index it and run again." ) fai_df = _read_faidx(pathlib.Path(reference_fasta + ".fai")) if not pathlib.Path(bam_path + ".bai").exists(): subprocess.check_call(shlex.split("samtools index " + bam_path)) # check chromosome between BAM and FASTA # samtools have a bug when chromosome not match... bam_chroms_index = _get_bam_chrom_index(bam_path) unknown_chroms = [i for i in bam_chroms_index if i not in fai_df.index] if len(unknown_chroms) != 0: unknown_chroms = " ".join(unknown_chroms) raise IndexError( f"BAM file contain unknown chromosomes: {unknown_chroms}\n" "Make sure you use the same genome FASTA file for mapping and bam-to-allc." ) # if parallel, chunk genome if cpu > 1: regions = genome_region_chunks( reference_fasta + ".fai", bin_length=100000000, combine_small=False ) else: regions = None # Output path input_path = pathlib.Path(bam_path) file_dir = input_path.parent if output_path is None: allc_name = "allc_" + input_path.name.split(".")[0] + ".tsv.gz" output_path = str(file_dir / allc_name) else: if not output_path.endswith(".gz"): output_path += ".gz" if cpu > 1: raise NotImplementedError temp_out_paths = [] for batch_id, region in enumerate(regions): temp_out_paths.append(output_path + f".batch_{batch_id}.tmp.tsv.gz") with ProcessPoolExecutor(max_workers=cpu) as executor: future_dict = {} for batch_id, (region, out_temp_path) in enumerate( zip(regions, temp_out_paths) ): _kwargs = dict( bam_path=bam_path, reference_fasta=reference_fasta, fai_df=fai_df, output_path=out_temp_path, region=region, num_upstr_bases=num_upstr_bases, num_downstr_bases=num_downstr_bases, buffer_line_number=buffer_line_number, min_mapq=min_mapq, min_base_quality=min_base_quality, compress_level=compress_level, tabix=False, save_count_df=False, ) future_dict[executor.submit(_bam_to_allc_worker, **_kwargs)] = batch_id count_dfs = [] for future in as_completed(future_dict): batch_id = future_dict[future] try: count_dfs.append(future.result()) except Exception as exc: log.info("%r generated an exception: %s" % (batch_id, exc)) # aggregate ALLC with open_allc( output_path, mode="w", compresslevel=compress_level, threads=1, region=None, ) as out_f: # TODO: Parallel ALLC is overlapped, # the split by region strategy only split reads, but reads can overlap # need to adjust and merge end rows in aggregate ALLC for out_temp_path in temp_out_paths: with open_allc(out_temp_path) as f: for line in f: out_f.write(line) subprocess.check_call(["rm", "-f", out_temp_path]) # tabix if tabix: subprocess.run( shlex.split(f"tabix -b 2 -e 2 -s 1 {output_path}"), check=True ) # aggregate count_df count_df = _aggregate_count_df(count_dfs) if save_count_df: count_df.to_csv(output_path + ".count.csv") return count_df else: return _bam_to_allc_worker( bam_path, reference_fasta, fai_df, output_path, region=None, num_upstr_bases=num_upstr_bases, num_downstr_bases=num_downstr_bases, buffer_line_number=buffer_line_number, min_mapq=min_mapq, min_base_quality=min_base_quality, compress_level=compress_level, tabix=tabix, save_count_df=save_count_df,
)