Source code for ALLCools._allc_to_bigwig_new

import pyBigWig
import pysam
from collections import defaultdict
from ._doc import *
from .utilities import parse_mc_pattern, parse_chrom_size


[docs]class ContextCounter: def __init__(self, mc_contexts): self.mc_counts = defaultdict(float) # key is context pattern like CHN, CGN self.cov_counts = defaultdict(float) # key is context pattern like CHN, CGN # key is each single context like CCC, CGC, value is a list of context pattern this context belong to. # A single context may belong to multiple patterns self.context_map = defaultdict(list) for context in mc_contexts: parsed_context_set = parse_mc_pattern(context) for c in parsed_context_set: self.context_map[c].append(context)
[docs] def add(self, context, mc, cov): for c in self.context_map[context]: # print(context, c, mc, cov) self.mc_counts[c] += mc self.cov_counts[c] += cov
[docs]class StrandContextCounter: def __init__(self, mc_contexts): self.mc_watson_counts = defaultdict( float ) # key is context pattern like CHN, CGN self.cov_watson_counts = defaultdict( float ) # key is context pattern like CHN, CGN self.mc_crick_counts = defaultdict( float ) # key is context pattern like CHN, CGN self.cov_crick_counts = defaultdict( float ) # key is context pattern like CHN, CGN # key is each single context like CCC, CGC, value is a list of context pattern this context belong to. # A single context may belong to multiple patterns self.context_map = defaultdict(list) for context in mc_contexts: parsed_context_set = parse_mc_pattern(context) for c in parsed_context_set: self.context_map[c].append(context)
[docs] def add(self, context, strand, mc, cov): # print(context, strand, mc, cov) for c in self.context_map[context]: if strand == "+": self.mc_watson_counts[c] += mc self.cov_watson_counts[c] += cov else: self.mc_crick_counts[c] += mc self.cov_crick_counts[c] += cov
[docs]def write_entry( counter, context_handle, mc_contexts, strandness, chrom, bin_start, bin_size ): if strandness == "split": for context in mc_contexts: # Watson strand mc = counter.mc_watson_counts[context] cov = counter.cov_watson_counts[context] if cov != 0: frac = mc / cov frac_handle = context_handle[context, "+", "frac"] frac_handle.addEntries( chrom, starts=[bin_start], values=[frac], span=bin_size ) cov_handle = context_handle[context, "+", "cov"] cov_handle.addEntries( chrom, starts=[bin_start], values=[cov], span=bin_size ) # Crick strand mc = counter.mc_crick_counts[context] cov = counter.cov_crick_counts[context] if cov != 0: frac = mc / cov frac_handle = context_handle[context, "-", "frac"] frac_handle.addEntries( chrom, starts=[bin_start], values=[frac], span=bin_size ) cov_handle = context_handle[context, "-", "cov"] cov_handle.addEntries( chrom, starts=[bin_start], values=[cov], span=bin_size ) else: for context in mc_contexts: # Both strand mc = counter.mc_counts[context] cov = counter.cov_counts[context] if cov != 0: frac = mc / cov frac_handle = context_handle[context, "frac"] frac_handle.addEntries( chrom, starts=[bin_start], values=[frac], span=bin_size ) cov_handle = context_handle[context, "cov"] cov_handle.addEntries( chrom, starts=[bin_start], values=[cov], span=bin_size ) return
@doc_params( allc_path_doc=allc_path_doc, mc_contexts_doc=mc_contexts_doc, chrom_size_path_doc=chrom_size_path_doc, strandness_doc=strandness_doc, bw_bin_sizes_doc=bw_bin_sizes_doc,
[docs]) def allc_to_bigwig( allc_path, output_prefix, bin_size, mc_contexts, chrom_size_path, strandness ): """\ Generate BigWig files from one ALLC file. Parameters ---------- allc_path {allc_path_doc} output_prefix Path prefix of the output BigWig file. bin_size {bw_bin_sizes_doc} mc_contexts {mc_contexts_doc} strandness {strandness_doc} chrom_size_path {chrom_size_path_doc} If chrom_size_path provided, will use it to extract ALLC with chrom order, but if region provided, will ignore this. """ if strandness not in {"split", "both"}: raise ValueError(f'strandness need to be "split" or "both", got "{strandness}"') chrom_sizes = parse_chrom_size(chrom_size_path) chrom_sizes_list = [(k, v) for k, v in chrom_sizes.items()] # create bigwig file handles for each case # context_handle: key is mC context pattern like CHN, CAN, CGN, value is the output handle context_handle = {} output_path_collect = {} for bw_type in ["frac", "cov"]: out_suffix = f"{bw_type}.bw" for mc_context in mc_contexts: if strandness == "split": file_path = output_prefix + f".{mc_context}-Watson.{out_suffix}" output_path_collect[(mc_context, "Watson", out_suffix)] = file_path # handle for Watson/+ strand w_handle = pyBigWig.open(file_path, "w") w_handle.addHeader(chrom_sizes_list) context_handle[(mc_context, "+", bw_type)] = w_handle file_path = output_prefix + f".{mc_context}-Crick.{out_suffix}" output_path_collect[(mc_context, "Crick", out_suffix)] = file_path # handle for Crick/- strand c_handle = pyBigWig.open(file_path, "w") c_handle.addHeader(chrom_sizes_list) context_handle[(mc_context, "-", bw_type)] = c_handle else: # handle for both strand file_path = output_prefix + f".{mc_context}-{strandness}.{out_suffix}" output_path_collect[(mc_context, strandness, out_suffix)] = file_path _handle = pyBigWig.open(file_path, "w") _handle.addHeader(chrom_sizes_list) context_handle[mc_context, bw_type] = _handle def _init_counter(_contexts, _strandness): if _strandness == "split": # a counter for +/- strand separately _counter = StrandContextCounter(_contexts) else: # a counter for both +/- strands _counter = ContextCounter(_contexts) return _counter with pysam.TabixFile(allc_path) as allc: allc_chroms = set(allc.contigs) for chrom, chrom_size in chrom_sizes.items(): if chrom not in allc_chroms: continue counter = _init_counter(mc_contexts, strandness) cur_bin = 0 for line in allc.fetch(chrom): _, pos, strand, context, mc, cov, _ = line.split("\t") pos = int(pos) mc = float(mc) cov = float(cov) this_bin = (pos - 1) // bin_size if this_bin != cur_bin: # dump cur_bin counts bin_start = int(cur_bin * bin_size) write_entry( counter=counter, context_handle=context_handle, mc_contexts=mc_contexts, strandness=strandness, chrom=chrom, bin_start=bin_start, bin_size=bin_size, ) # initiate next bin cur_bin = this_bin counter = _init_counter(mc_contexts, strandness) # add counts if strandness == "split": counter.add(context, strand, mc, cov) else: counter.add(context, mc, cov) # final bin of the chrom bin_start = int(cur_bin * bin_size) write_entry( counter=counter, context_handle=context_handle, mc_contexts=mc_contexts, strandness=strandness, chrom=chrom, bin_start=bin_start, bin_size=bin_size, ) print(chrom, "finished") for handle in context_handle.values(): handle.close() return output_path_collect