Source code for ALLCools.dmr.call_dmr

import xarray as xr
import numpy as np
import pandas as pd
from ..mcds import RegionDS
from ..mcds.utilities import write_ordered_chunks, update_dataset_config
import subprocess
from concurrent.futures import ProcessPoolExecutor, as_completed

[docs]def _call_dmr_single_chrom( dms_dir, output_dir, chrom, p_value_cutoff=0.001, frac_delta_cutoff=0.2, max_dist=250, residual_quantile=0.6, corr_cutoff=0.3, dms_ratio=0.8, ): """Call DMR for single chromosome, see call_dmr for doc""" ds =, region_dim="dms") # open DMS dataset and select single chromosome, significant, large delta DMS p_value_judge = ds.coords["dms_p-values"] < p_value_cutoff frac_delta = ds["dms_da_frac"].max(dim="sample") - ds["dms_da_frac"].min( dim="sample" ) frac_delta_judge = frac_delta > frac_delta_cutoff chrom_judge = ds["dms_chrom"] == chrom # has to use load(scheduler='sync') when this function is called from multiprocess final_judge = ( (p_value_judge & frac_delta_judge & chrom_judge) .load(scheduler="sync") .to_pandas() ) if final_judge.sum() == 0: # No DMS remain return use_dms = final_judge[final_judge].index ds = ds.sel(dms=use_dms).load(scheduler="sync") # step 1: combine raw DMR windows based on distance dms_dist = ds["dms_pos"][1:].values - ds["dms_pos"][:-1].values dist_judge = dms_dist < max_dist cur_dmr = 0 dmr_ids = [0] for i in dist_judge: if not i: cur_dmr += 1 dmr_ids.append(cur_dmr) dmr_ids = pd.Series(dmr_ids, index=ds.get_index("dms")) # step 2: determine correlation between adjacent DMS data = ds["dms_da_frac"].transpose("dms", "sample").to_pandas() a = data.iloc[:-1, :].reset_index(drop=True) b = data.iloc[1:, :].reset_index(drop=True) # index of corr means the correlation of that DMS with the previous DMS # regardless of the genome distance # fill na value with 1, tend to not to split DMR due to nan corrs = a.fillna(1).corrwith(b.fillna(1), axis=1, method="pearson") corrs.index = data.iloc[1:, :].index.copy() # step 3: recalculate DMR windows based on both distance and correlation raw_dmr_table = pd.DataFrame({"dmr": dmr_ids, "corr": corrs}) dmr_dict = {} cur_dmr_id = 0 for dmr_id, sub_df in raw_dmr_table.groupby("dmr"): dmr_dict[sub_df.index[0]] = cur_dmr_id if sub_df.shape[0] > 1: for dms_id, corr in sub_df["corr"][1:].items(): if corr > corr_cutoff: dmr_dict[dms_id] = cur_dmr_id else: cur_dmr_id += 1 dmr_dict[dms_id] = cur_dmr_id cur_dmr_id += 1 dmr_ids = pd.Series(dmr_dict) = "dms" ds.coords["dmr"] = dmr_ids # step 4: determine sample hypo or hyper in each DMS and DMR based on residual if residual_quantile < 0.5: residual_quantile = 1 - residual_quantile residual_low_cutoff, residual_high_cutoff = np.nanquantile( ds["dms_residual"], [1 - residual_quantile, residual_quantile] ) lower_residual = ds["dms_residual"] < residual_low_cutoff higher_residual = ds["dms_residual"] > residual_high_cutoff # for each sample in each DMS # -1 means hypo methylation, 1 means hyper methylation, 0 mean no sig change dms_states = lower_residual.astype(int) * -1 + higher_residual.astype(int) # dmr state judge # use mean to calculate the overall DMR state from DMS dmr_states = dms_states.groupby("dmr").mean() # then mask inconsistent DMR states with the dms_ratio cutoff dmr_states = xr.where(np.abs(dmr_states) < dms_ratio, 0, dmr_states) # then "round" the dmr_states, so it only contains -1, 0, 1 dmr_states = xr.where(dmr_states > 0, 1, dmr_states) dmr_states = xr.where(dmr_states < 0, -1, dmr_states) # step 5: prepare dmr counts and fractions dmr_da = ds["dms_da"].groupby("dmr").sum() dmr_frac = dmr_da.sel(count_type="mc") / dmr_da.sel(count_type="cov") dmr_ds = xr.Dataset( { "dmr_da": dmr_da.astype(np.uint32), "dmr_state": dmr_states.astype(np.int8), "dmr_da_frac": dmr_frac.astype(np.float32), } ) # add n dms counts n_dms = dmr_ids.value_counts().sort_index() = "dmr" dmr_ds.coords["ndms"] = n_dms # add genome coords dmr_ds.coords["chrom"] = pd.Series( [chrom for _ in range(dmr_ds.get_index("dmr").size)], index=dmr_ds.get_index("dmr"), ) dmr_ds.coords["start"] = ds["dms_pos"].groupby(ds["dmr"]).min().to_pandas() - 1 dmr_ds.coords["end"] = ds["dms_pos"].groupby(ds["dmr"]).max().to_pandas() + 1 dmr_ds.coords["length"] = dmr_ds.coords["end"] - dmr_ds.coords["start"] dmr_ds.coords["dmr"] = ( dmr_ds["chrom"].to_pandas().astype(str) + "-" + dmr_ds["dmr"].to_pandas().astype(str) ) # rename none dimensional coords to prevent collision when merge with other ds dmr_ds = dmr_ds.rename( {k: f"dmr_{k}" for k in dmr_ds.coords.keys() if k not in dmr_ds.dims} ) output_path = f"{output_dir}/{chrom}.zarr" dmr_ds.to_zarr(output_path) return output_path
[docs]def call_dmr( output_dir, replicate_label=None, p_value_cutoff=0.001, frac_delta_cutoff=0.2, max_dist=250, residual_quantile=0.6, corr_cutoff=0.3, dms_ratio=0.8, cpu=1, chrom=None, ): """ Call DMR from DMS results. Parameters ---------- output_dir replicate_label p_value_cutoff frac_delta_cutoff max_dist residual_quantile corr_cutoff dms_ratio cpu chrom Returns ------- """ if chrom is None: chrom_size_path = f"{output_dir}/chrom_sizes.txt" from ..utilities import parse_chrom_size chroms = parse_chrom_size(chrom_size_path).keys() else: if isinstance(chrom, list): chroms = chrom else: chroms = [chrom] chunk_dir = f"{output_dir}/.dmr_chunk" dmr_dir = f"{output_dir}/dmr" with ProcessPoolExecutor(cpu) as exe: futures = {} for chunk_i, chrom in enumerate(chroms): future = exe.submit( _call_dmr_single_chrom, dms_dir=f"{output_dir}/dms", output_dir=chunk_dir, chrom=chrom, p_value_cutoff=p_value_cutoff, frac_delta_cutoff=frac_delta_cutoff, max_dist=max_dist, residual_quantile=residual_quantile, corr_cutoff=corr_cutoff, dms_ratio=dms_ratio, ) futures[future] = chunk_i chunks_to_write = {} for future in as_completed(futures): chunk_i = futures[future] output_path = future.result() chunks_to_write[chunk_i] = output_path write_ordered_chunks( chunks_to_write, final_path=dmr_dir, append_dim="dmr", engine="zarr", coord_dtypes=None, )"rm -rf {chunk_dir}", shell=True) update_dataset_config(output_dir=output_dir, add_ds_region_dim={"dmr": "dmr"}, change_region_dim="dmr") if replicate_label is not None: collapse_replicates(output_dir, replicate_label) return
[docs]def collapse_replicates(region_ds, replicate_label, state_da="dmr_state"): if not isinstance(replicate_label, str): # assume the replicate label is provided as a series or dataarray # that can be added into coords region_ds.coords["replicate_label"] = replicate_label replicate_label_name = "replicate_label" else: replicate_label_name = replicate_label # check if label exist and complete assert replicate_label_name in region_ds.coords assert region_ds.coords[replicate_label_name].to_pandas().isna().sum() == 0 sample_dim, *_ = region_ds.coords[replicate_label_name].dims collapsed = {} for rep, sub_da in region_ds[state_da].groupby(replicate_label_name): std = sub_da.std(dim=sample_dim) # std == 0 means all replicates having same state reduced_da = xr.where( std == 0, sub_da.sel({sample_dim: sub_da[sample_dim][0]}), 0 ).to_pandas() collapsed[rep] = reduced_da collapsed = pd.DataFrame(collapsed) = f"{sample_dim}_collapsed" = region_ds.region_dim region_ds[f"{state_da}_collapsed"] = collapsed if region_ds.location is not None: region_ds[[f"{state_da}_collapsed"]].to_zarr( f"{region_ds.location}/{region_ds.region_dim}/", mode="a" ) print(f"Collapsed sample state added in exist RegionDS at {region_ds.location}") return