Source code for ALLCools.dmr.call_dms

import numpy as np
import pathlib
import pysam
import pandas as pd
import xarray as xr
import subprocess
import yaml
from ALLCools.utilities import genome_region_chunks
from ALLCools.mcds.utilities import write_ordered_chunks
from concurrent.futures import ProcessPoolExecutor, as_completed
from .rms_test import (
    permute_root_mean_square_test,
    calculate_residual,
    downsample_table,
)


[docs]def _read_single_allc(path, region): grange = region.split(":")[1] start, _ = grange.split("-") rows = [] with pysam.TabixFile(path) as allc: for row in allc.fetch(region): _row = row.split("\t")[:6] rows.append(_row) data = pd.DataFrame( rows, columns=["chrom", "pos", "strand", "context", "mc", "cov"] ).set_index("pos") data.index = data.index.astype(np.int64) data["mc"] = data["mc"].astype(np.float64) data["cov"] = data["cov"].astype(np.float64) # important, turn the second column from cov to uc data["c"] = data["cov"] - data["mc"] # boarder case: if the pos equal to region start, # it might be included twice in adjacent regions # because both end and start are the same for single C data = data[data.index > int(start)] counts = data[["mc", "c"]].astype(np.float64) contexts = data["context"] return counts, contexts
[docs]def _make_count_table(path_list, region): all_contexts = {} all_counts = [] for path in path_list: # counts has two columns: mc and c counts, contexts = _read_single_allc(path, region) all_contexts.update(contexts.to_dict()) all_counts.append(counts) all_contexts = pd.Series(all_contexts, dtype="object").sort_index() total_counts = pd.concat( [df.reindex(all_contexts.index) for df in all_counts], axis=1 ).fillna(0) return all_contexts, total_counts
[docs]def _perform_rms_batch( output_dir, allc_paths, samples, region, max_row_count, n_permute, min_pvalue ): contexts, total_counts = _make_count_table(path_list=allc_paths, region=region) n_samples = len(samples) total_values = {} p_values = {} if total_counts.shape[0] == 0: return None print(f"RMS tests for {total_counts.shape[0]} sites.") for pos, row in total_counts.iterrows(): # table has two columns: mc and c table = row.values.reshape(len(samples), 2) _table = downsample_table(table, max_row_count=max_row_count) p = permute_root_mean_square_test( _table, n_permute=n_permute, min_pvalue=min_pvalue ) # all the p-values will be saved for FDR p_values[pos] = p # but not all the values if p <= min_pvalue: residual = calculate_residual(table) # mc_residual = -c_residual, so only save one column residual = residual[:, [0]] table[:, 1] = table.sum(axis=1) # turn c back to cov site_values = np.concatenate([table, residual], axis=1) total_values[pos] = site_values n_sites = len(total_values) site_matrix = np.zeros((n_samples, n_sites, 3)) pos_order = [] for i, (pos, site) in enumerate(total_values.items()): site_matrix[:, i, :] = site pos_order.append(pos) da = xr.DataArray( site_matrix[:, :, :2], coords=[samples, pos_order, ["mc", "cov"]], dims=["sample", "pos", "count_type"], ) residual_da = xr.DataArray( site_matrix[:, :, 2], coords=[samples, pos_order], dims=["sample", "pos"] ) if da.get_index("pos").size == 0: # pos dtype is wrong in case there is not record at all da.coords["pos"] = da.coords["pos"].astype(int) p_values = pd.Series(p_values, dtype="float64") p_values.index.name = "pos" da.coords["p-values"] = p_values contexts = pd.Series(contexts, dtype="object") contexts.index.name = "pos" da.coords["contexts"] = contexts da.coords["chrom"] = pd.Series(region.split(":")[0], index=da.get_index("pos")) ds = xr.Dataset({"dms_da": da, "dms_residual": residual_da}) with np.errstate(divide="ignore"): ds["dms_da_frac"] = da.sel(count_type="mc") / da.sel(count_type="cov") # concatenate results # sort dataset and reset index ds = ds.sortby([ds.coords["chrom"], ds.coords["pos"]]) ds = ds.rename({"pos": "dms"}) # pos is still the genome coords ds.coords["pos"] = ds.coords["dms"].copy() # set str coords otherwise the zarr saving raise error ds.coords["chrom"] = ds.coords["chrom"].astype("str") # reset index to dms_id with int range ds.coords["dms"] = ( ds.coords["chrom"].to_pandas().astype(str) + "-" + ds.coords["dms"].to_pandas().astype(str) ) ds.coords["contexts"] = ds.coords["contexts"].astype("str") # rename none dimensional coords to prevent collision when merge with other ds ds = ds.rename({k: f"dms_{k}" for k in ds.coords.keys() if k not in ds.dims}) # save total dms results output_path = f"{output_dir}/{region}.zarr" ds.to_zarr(output_path, mode="w") return output_path
[docs]def call_dms( output_dir, allc_paths, samples, chrom_size_path, cpu=1, max_row_count=50, n_permute=3000, min_pvalue=0.01, region=None, ): """ Call DMS from multiple ALLC files Parameters ---------- output_dir allc_paths samples chrom_size_path cpu max_row_count n_permute min_pvalue region Returns ------- """ pathlib.Path(output_dir).mkdir(exist_ok=True) with open(f"{output_dir}/.ALLCools", "w") as f: config = {"region_dim": "dms", "ds_region_dim": {"dms": "dms"}} yaml.dump(config, f) subprocess.run( f"cp {chrom_size_path} {output_dir}/chrom_sizes.txt", shell=True, check=True ) chrom_size_path = f"{output_dir}/chrom_sizes.txt" # separate chrom chunks for parallel if region is None: regions = genome_region_chunks(chrom_size_path, bin_length=20000000) else: # only calculate user provided regions if isinstance(region, list): regions = region else: regions = [region] cpu = 1 # temp dir dms_chunk_dir = pathlib.Path(f"{output_dir}/.dms_chunks") dms_chunk_dir.mkdir(exist_ok=True, parents=True) dms_dir = f"{output_dir}/dms" # trigger the numba JIT compilation before multiprocessing table = np.array([[0, 1], [0, 1]]) permute_root_mean_square_test(table) calculate_residual(table) downsample_table(table, max_row_count=max_row_count) # parallel each chunk with ProcessPoolExecutor(cpu) as exe: futures = {} for chunk_id, region in enumerate(regions): future = exe.submit( _perform_rms_batch, output_dir=dms_chunk_dir, allc_paths=allc_paths, samples=samples, region=region, max_row_count=max_row_count, n_permute=n_permute, min_pvalue=min_pvalue, ) futures[future] = chunk_id chunks_to_write = {} for future in as_completed(futures): chunk_i = futures[future] output_path = future.result() chunks_to_write[chunk_i] = output_path write_ordered_chunks( chunks_to_write, final_path=dms_dir, append_dim="dms", engine="zarr", coord_dtypes=None, ) subprocess.run(f"rm -rf {dms_chunk_dir}", shell=True) return