Source code for ALLCools.mcds.region_ds

import os
import pathlib
import shutil
import subprocess
import warnings
from concurrent.futures import ProcessPoolExecutor, as_completed

import dask
import joblib
import numpy as np
import pandas as pd
import pyBigWig
import pybedtools
import xarray as xr
import yaml
from pybedtools import BedTool

from ALLCools.utilities import parse_chrom_size
from .utilities import determine_engine, obj_to_str, write_ordered_chunks, update_dataset_config

os.environ["NUMEXPR_MAX_THREADS"] = "16"


[docs]def _bigwig_over_bed(bed: pd.DataFrame, path, value_type="mean", dtype="float32"): with pyBigWig.open(path) as bw: def _region_stat(row, t=value_type): chrom, start, end, *_ = row try: value = bw.stats(chrom, start, end, type=t)[0] except RuntimeError: # happens when the region has error or chrom not exist in bw # let user decide what happen, here just return nan value = np.NaN return value values = bed.apply(_region_stat, t=value_type, axis=1) values = values.astype(dtype) return values
[docs]def _region_bed_sorted(bed_path, g, bed_sorted): chrom_sizes = parse_chrom_size(g) bed_df = pd.read_csv(bed_path, sep="\t", index_col=None, header=None) # select chroms that exist in g bed_df = bed_df.loc[bed_df.iloc[:, 0].isin(chrom_sizes.keys())] bed = BedTool.from_dataframe(bed_df) if bed_sorted: return bed else: return bed.sort(g=g)
[docs]def _bed_intersection(bed: pybedtools.BedTool, path, g, region_index, bed_sorted, fraction=0.2): with warnings.catch_warnings(): warnings.simplefilter("ignore") query_bed = _region_bed_sorted(path, g, bed_sorted) try: df = bed.intersect( query_bed, wa=True, f=fraction, g=g, sorted=True ).to_dataframe() if df.shape[0] == 0: regions_idx = pd.Series([]) else: regions_idx = df["name"] except pd.errors.EmptyDataError: regions_idx = pd.Series([]) regions = pd.Index(regions_idx.values) bool_series = pd.Series(region_index.isin(regions), index=region_index) query_bed.delete_temporary_history(ask=False) return bool_series
[docs]def _annotate_by_bigwigs_worker( dataset_path, region_dim, chrom_size_path, track_paths, output_path, dim, slop, value_type, dtype, **kwargs, ): len(kwargs) # set dask scheduler to allow multiprocessing with dask.config.set(scheduler="sync"): # Open region ds again inside the worker function region_ds = RegionDS.open( path=dataset_path, region_dim=region_dim, chrom_size_path=chrom_size_path ) # get dmr region bed and bigwig files dmr_bed = region_ds.get_bed( with_id=False, bedtools=False, slop=slop, chrom_size_path=chrom_size_path ) # iterate each bigwig total_values = {} for sample, bigwig_path in track_paths.items(): values = _bigwig_over_bed( bed=dmr_bed, path=bigwig_path, value_type=value_type, dtype=dtype ) total_values[sample] = values total_values = pd.DataFrame(total_values) total_values.columns.name = dim total_values.index.name = region_dim ds = xr.Dataset({f"{region_dim}_{dim}_da": total_values}) ds.to_zarr(output_path, mode="w") return output_path
[docs]def _annotate_by_beds_worker( dataset_path, region_dim, chrom_size_path, slop, track_paths, dtype, dim, output_path, bed_sorted, fraction=0.2, **kwargs, ): len(kwargs) # set dask scheduler to allow multiprocessing with dask.config.set(scheduler="sync"): # Open region ds again inside the worker function region_ds = RegionDS.open( path=dataset_path, region_dim=region_dim, chrom_size_path=chrom_size_path ) # get dmr region bed dmr_bed = region_ds.get_bed( with_id=True, bedtools=True, slop=slop, chrom_size_path=chrom_size_path ).sort(g=chrom_size_path) total_values = {} for sample, bed_path in track_paths.items(): values = _bed_intersection( bed=dmr_bed, path=bed_path, bed_sorted=bed_sorted, g=chrom_size_path, region_index=region_ds.get_index(region_ds.region_dim), fraction=fraction ) total_values[sample] = values.astype(dtype) total_values = pd.DataFrame(total_values) total_values.columns.name = dim ds = xr.Dataset({f"{region_dim}_{dim}_da": total_values}) ds.to_zarr(output_path, mode="w") dmr_bed.delete_temporary_history(ask=False) return output_path
[docs]def _fisher_exact(row, alternative="two-sided"): from scipy.stats import fisher_exact oddsratio, p = fisher_exact(row.values.reshape((2, 2)), alternative=alternative) value = pd.Series({"oddsratio": oddsratio, "p": p}) return value
[docs]class RegionDS(xr.Dataset):
[docs] __slots__ = ()
def __init__(self, dataset, region_dim=None, location=None, chrom_size_path=None): super().__init__( data_vars=dataset.data_vars, coords=dataset.coords, attrs=dataset.attrs ) self.region_dim = region_dim self.location = location self.chrom_size_path = chrom_size_path return @property
[docs] def region_dim(self): return self.attrs.get("region_dim")
@region_dim.setter def region_dim(self, region_dim): if region_dim is not None: if region_dim not in self.dims: raise KeyError( f"{region_dim} does not occur in dimension names: {list(self.dims.keys())}" ) self.attrs["region_dim"] = region_dim else: return @property
[docs] def chrom_size_path(self): return self.attrs.get("chrom_size_path")
@chrom_size_path.setter def chrom_size_path(self, chrom_size_path): if chrom_size_path is not None: chrom_size_path = pathlib.Path(chrom_size_path).absolute() if not chrom_size_path.exists(): raise FileNotFoundError(str(chrom_size_path)) self.attrs["chrom_size_path"] = str(chrom_size_path) else: return @property
[docs] def location(self): return self.attrs.get("region_ds_location")
@location.setter def location(self, path): if path is not None: location = pathlib.Path(path).absolute() self.attrs["region_ds_location"] = str(location) location.mkdir(exist_ok=True, parents=True) else: return @classmethod
[docs] def from_bed( cls, bed, location, chrom_size_path, region_dim="region", sort_bed=True ): """ Create empty RegionDS from a bed file. Parameters ---------- bed location region_dim chrom_size_path sort_bed Returns ------- """ # sort bed based on chrom_size_path if isinstance(bed, (str, pathlib.PosixPath)): if sort_bed: bed = BedTool(bed).sort(g=chrom_size_path).to_dataframe() else: bed = BedTool(bed) else: bed = bed n_cols = bed.shape[1] if n_cols == 3: bed.index = bed.index.map(lambda i: f"{region_dim}_{i}") elif n_cols == 4: bed.set_index(bed.columns[3], inplace=True) else: raise ValueError( "bed file need to be either 3 columns (chrom, start, end) " "or 4 columns (chrom, start, end, name)" ) bed.index.name = region_dim bed.columns = ["chrom", "start", "end"] ds = xr.Dataset({}) region_dim = bed.index.name for k, v in bed.items(): key = f"{region_dim}_{k}" ds.coords[key] = v if ds.coords[key].dtype == "object": ds.coords[key] = ds.coords[key].astype(str) location = pathlib.Path(location).absolute() location.mkdir(exist_ok=True, parents=True) region_ds = cls( ds, region_dim=region_dim, location=location, chrom_size_path=chrom_size_path, ) region_ds.save() return region_ds
@classmethod
[docs] def open( cls, path, region_dim=None, use_regions=None, split_large_chunks=True, chrom_size_path=None, select_dir=None, chunks='auto', engine="zarr", ): if isinstance(path, (str, pathlib.PosixPath)): _path = pathlib.Path(path).absolute() if _path.is_dir(): # check if this is a RegionDS dir that contains multiple datasets region_ds_file = _path / ".ALLCools" if region_ds_file.exists(): with open(region_ds_file) as f: region_ds_config = yaml.load(f, yaml.SafeLoader) if region_dim is None: region_dim = region_ds_config["region_dim"] print(f"Using {region_dim} as region_dim") # only open datasets having the region_dim # other datasets will not be opened # e.g, when region_dim == 'dmr', dms dataset will not be opened. exclude_dir_name = [ k for k, v in region_ds_config["ds_region_dim"].items() if v != region_dim ] # add output_dir region_ds_location = pathlib.Path(path).absolute() # add chrom size path if exist if chrom_size_path is None: chrom_size_path = _path / "chrom_sizes.txt" if not chrom_size_path.exists(): chrom_size_path = None # read all sub dir as a RegionDS, then merge all the RegionDS together. datasets = [] for sub_dir_path in _path.iterdir(): if sub_dir_path.is_dir() and sub_dir_path.name[0] != ".": if select_dir is not None: if sub_dir_path.name not in select_dir: # not in select_dir list, skip continue if sub_dir_path.name in exclude_dir_name: # the dir do not have region_dim, skip continue if not (sub_dir_path / ".zattrs").exists(): # print(f'{sub_dir_path} does not seem to be a zarr storage, skipped') continue try: datasets.append( cls._open_single_dataset( path=sub_dir_path, region_dim=region_dim, split_large_chunks=split_large_chunks, engine=engine, ) ) except BaseException as e: print(f"An error raised when reading {sub_dir_path}.") raise e region_ds = cls( xr.merge(datasets), region_dim=region_dim, location=region_ds_location, chrom_size_path=chrom_size_path, ) else: # is dir, but is not RegionDS dir, could be just a zarr dir region_ds = cls._open_single_dataset( path=path, region_dim=region_dim, split_large_chunks=split_large_chunks, chrom_size_path=chrom_size_path, engine=engine, ) else: # dataset stored in other format, such as netcdf4 region_ds = cls._open_single_dataset( path=path, region_dim=region_dim, split_large_chunks=split_large_chunks, chrom_size_path=chrom_size_path, engine=engine, ) else: # could be a list of paths, open it as a single dataset path = list(path) region_ds = cls._open_single_dataset( path=path, region_dim=region_dim, split_large_chunks=split_large_chunks, chrom_size_path=chrom_size_path, engine=engine, ) if use_regions is not None: region_ds = region_ds.sel({region_dim: use_regions}) if chunks is not None: # change object dtype to string if obj_to_str: for k in region_ds.coords.keys(): if region_ds.coords[k].dtype == 'O': region_ds.coords[k] = region_ds.coords[k].astype(str) if chunks == 'auto': chunks = {k: min(4096, max(v // 5, 1)) for k, v in region_ds.dims.items()} region_ds = region_ds.chunk(chunks=chunks) return region_ds
@classmethod
[docs] def _open_single_dataset( cls, path, region_dim, split_large_chunks=True, chrom_size_path=None, location=None, engine=None, ): """ Take one or multiple RegionDS file paths and create single RegionDS concatenated on region_dim Parameters ---------- path Single RegionDS path or RegionDS path pattern with wildcard or RegionDS path list region_dim Dimension name of regions split_large_chunks Split large dask array chunks if true Returns ------- RegionDS """ # print('opening', path) if region_dim is None: raise ValueError( "Please specify a region_dim name when open a normal xr.Dataset with RegionDS." ) if engine is None: engine = determine_engine(path) # if engine is None: # print(f'Open RegionDS with netcdf4 engine.') # else: # print(f'Open RegionDS with {engine} engine.') try: if (isinstance(path, str) and "*" not in path) or isinstance( path, pathlib.PosixPath ): ds = xr.open_dataset(path, engine=engine) else: with dask.config.set( **{"array.slicing.split_large_chunks": split_large_chunks} ): if isinstance(path, str): import glob path = sorted([p for p in glob.glob(path)]) ds = xr.open_mfdataset( path, parallel=False, combine="nested", concat_dim=region_dim, engine=engine, ) except Exception as e: print(f"Got error when opening {path}") print(f"Engine parameter is {engine}") raise e ds = cls( ds, region_dim=region_dim, location=location, chrom_size_path=chrom_size_path, ) return ds
[docs] def iter_index(self, chunk_size=100000, dim=None): if dim is None: dim = self.region_dim index = self.get_index(dim) for chunk_start in range(0, index.size, chunk_size): use_index = index[chunk_start: chunk_start + chunk_size] yield use_index
[docs] def iter_array(self, chunk_size=100000, dim=None, da=None, load=False): if dim is None: dim = self.region_dim if da is None: da = f"{dim}_da" _da = self[da] try: assert dim in _da.dims except AssertionError as e: print('dim', dim) print('_da.dims', _da.dims) raise e for _index in self.iter_index(chunk_size=chunk_size, dim=dim): use_da = _da.sel({dim: _index}) if load: use_da.load() yield use_da
[docs] def get_fasta( self, genome_fasta, output_path, slop=None, chrom_size_path=None, standardize_length=None, ): bed = self.get_bed( with_id=True, bedtools=True, slop=slop, chrom_size_path=chrom_size_path, standardize_length=standardize_length, ) bed.getfasta(fo=output_path, nameOnly=True, fi=genome_fasta) return
[docs] def get_bed( self, with_id=True, bedtools=False, slop=None, chrom_size_path=None, standardize_length=None, ): if chrom_size_path is None: chrom_size_path = self.chrom_size_path # will be none if not exist region_dim = self.region_dim bed_df = pd.DataFrame( { "chrom": self.coords[f"{region_dim}_chrom"], "start": self.coords[f"{region_dim}_start"], "end": self.coords[f"{region_dim}_end"], } ) # standardize region length, used in motif enrichment analysis if standardize_length is not None: # standardize_length is an int number region_center = bed_df["start"] + (bed_df["end"] - bed_df["start"]) // 2 bed_df["start"] = region_center - 1 bed_df["end"] = region_center slop = ( standardize_length // 2 ) # use the bedtools slop to extend the center to standard length if with_id: bed_df["name"] = self.get_index(region_dim).tolist() bed = None if slop is not None and slop > 0: if chrom_size_path is None: raise ValueError("Must provide chrom_size_path when slop is not None.") with warnings.catch_warnings(): warnings.simplefilter("ignore") bed = BedTool.from_dataframe(bed_df).slop(b=slop, g=chrom_size_path) if not bedtools: bed_df = bed.to_dataframe() if bedtools: if bed is None: bed = BedTool.from_dataframe(bed_df) return bed else: bed_df.index = self.get_index(self.region_dim) return bed_df
[docs] def _chunk_annotation_executor(self, annotation_function, cpu, save=True, **kwargs): chrom_size_path = kwargs["chrom_size_path"] dim = kwargs["dim"] chunk_size = kwargs["chunk_size"] # add back when submit jobs, use this to control parallel track_paths = kwargs.pop("track_paths") region_ds_path = self.location if region_ds_path is None: raise ValueError(f"Must have an on-disk location to annotate bigwigs.") if chrom_size_path is None: chrom_size_path = self.chrom_size_path region_dim = self.region_dim # deal with path chunk_dir_path = f"{region_ds_path}/.{region_dim}_{dim}_chunks" chunk_dir_path = pathlib.Path(chunk_dir_path) if chunk_dir_path.exists(): subprocess.run(f"rm -rf {chunk_dir_path}", shell=True) chunk_dir_path.mkdir(exist_ok=True) final_dir_path = f"{region_ds_path}/{region_dim}_{dim}" n_features = len(track_paths) if chunk_size == "auto": chunk_size = max(1, n_features // cpu // 2 + 1) print(f"Use chunk size {chunk_size}") other_kwargs = { "region_dim": region_dim, "dataset_path": region_ds_path, "chrom_size_path": chrom_size_path, } kwargs.update(other_kwargs) with ProcessPoolExecutor(cpu) as exe: futures = {} for i, chunk_start in enumerate(range(0, n_features, chunk_size)): output_path = f"{chunk_dir_path}/chunk_{i}.zarr" kwargs["output_path"] = output_path kwargs["track_paths"] = track_paths[ chunk_start: chunk_start + chunk_size ] future = exe.submit(annotation_function, **kwargs) futures[future] = i # time.sleep(1) chunks_to_write = {} for i, future in enumerate(as_completed(futures)): chunk_i = futures[future] output_path = future.result() chunks_to_write[chunk_i] = output_path if save: write_ordered_chunks( chunks_to_write=chunks_to_write, final_path=final_dir_path, append_dim=dim, engine="zarr", dtype=kwargs["dtype"], coord_dtypes=None, ) update_dataset_config(self.location, add_ds_region_dim={f"{region_dim}_{dim}": region_dim}) # load the newly generated da only _ds = xr.open_zarr(final_dir_path) else: _ds = xr.open_mfdataset( [chunks_to_write[k] for k in sorted(chunks_to_write.keys())], concat_dim=dim, combine="nested", engine="zarr", ).load() self.update(_ds) subprocess.run(f"rm -rf {chunk_dir_path}", shell=True) return
[docs] def annotate_by_bigwigs( self, bigwig_table, dim, slop=100, chrom_size_path=None, value_type="mean", chunk_size="auto", dtype="float32", cpu=1, save=True, ): if isinstance(bigwig_table, dict): track_paths = pd.Series(bigwig_table) elif isinstance(bigwig_table, pd.Series): track_paths = bigwig_table elif isinstance(bigwig_table, str) and bigwig_table.endswith("csv"): track_paths = pd.read_csv( bigwig_table, index_col=0, squeeze=True, header=None ) else: track_paths = pd.read_csv( bigwig_table, sep="\t", index_col=0, squeeze=True, header=None ) kwargs = dict( track_paths=track_paths, dim=dim, slop=slop, chrom_size_path=chrom_size_path, value_type=value_type, chunk_size=chunk_size, dtype=dtype, ) self._chunk_annotation_executor( _annotate_by_bigwigs_worker, cpu=cpu, save=save, **kwargs ) return
[docs] def annotate_by_beds( self, bed_table, dim, slop=100, chrom_size_path=None, chunk_size="auto", dtype="bool", bed_sorted=True, cpu=1, fraction=0.2, save=True, ): bed_tmp = pathlib.Path( f"./pybedtools_tmp_{np.random.randint(0, 100000)}" ).absolute() bed_tmp.mkdir(exist_ok=True) default_tmp = pybedtools.helpers.get_tempdir() pybedtools.helpers.set_tempdir(str(bed_tmp)) if isinstance(bed_table, dict): track_paths = pd.Series(bed_table) elif isinstance(bed_table, pd.Series): track_paths = bed_table elif isinstance(bed_table, str) and bed_table.endswith("csv"): track_paths = pd.read_csv(bed_table, index_col=0, squeeze=True, header=None) else: track_paths = pd.read_csv( bed_table, sep="\t", index_col=0, squeeze=True, header=None ) kwargs = dict( track_paths=track_paths, dim=dim, slop=slop, chrom_size_path=chrom_size_path, chunk_size=chunk_size, dtype=dtype, bed_sorted=bed_sorted, fraction=fraction ) self._chunk_annotation_executor( _annotate_by_beds_worker, cpu=cpu, save=save, **kwargs ) subprocess.run(f"rm -rf {bed_tmp}", shell=True) # pybedtools actually changed tempfile.tempdir # we must put it back otherwise other package might raise problem pybedtools.helpers.set_tempdir(default_tmp) return
[docs] def get_feature(self, feature_name, dim=None, da_name=None): if dim is None: try: data = self.coords[feature_name].to_pandas() except KeyError: raise KeyError( f"{feature_name} does not exist in RegionDS.coords, " f"if it belongs to a dataarray, please specify its dim name." ) else: if da_name is None: da_name = f"{self.region_dim}_{dim}_da" data = self[da_name].sel({dim: feature_name}).to_pandas() return data
[docs] def scan_motifs( self, genome_fasta, cpu=1, standardize_length=500, motif_set_path=None, chrom_size_path=None, combine_cluster=True, fnr_fpr_fold=1000, chunk_size=None, motif_dim="motif", snakemake=False ): region_ds_path = self.location if region_ds_path is None: raise ValueError(f"Must have an on-disk location to annotate bigwigs.") if chrom_size_path is None: chrom_size_path = self.attrs.get("chrom_size_path") # prepare fasta file for regions fasta_path = f"{region_ds_path}/regions.fasta" self.get_fasta( genome_fasta, output_path=fasta_path, slop=None, chrom_size_path=chrom_size_path, standardize_length=standardize_length, ) if snakemake: # prepare snakemake files and scan motif with larger parallel self._scan_motifs_snakemake( fasta_path, output_dir=region_ds_path, cpu=cpu, motif_dim=motif_dim, combine_cluster=combine_cluster, motif_set_path=motif_set_path, fnr_fpr_fold=fnr_fpr_fold, chunk_size=chunk_size ) else: # directly scan motif under current process self._scan_motif_local( fasta_path=fasta_path, cpu=cpu, motif_set_path=motif_set_path, combine_cluster=combine_cluster, fnr_fpr_fold=fnr_fpr_fold, chunk_size=chunk_size, motif_dim=motif_dim
)
[docs] def _scan_motif_local( self, fasta_path, cpu=1, motif_set_path=None, combine_cluster=True, fnr_fpr_fold=1000, chunk_size=None, motif_dim="motif" ): from ..motif import MotifSet if motif_set_path is not None: motif_set: MotifSet = joblib.load(motif_set_path) else: # default motif database from three sources from ..motif import get_default_motif_set motif_set = get_default_motif_set() if fnr_fpr_fold != 1000: motif_set.calculate_threshold( cpu=cpu, method="balance", threshold_value=fnr_fpr_fold ) region_dim = self.region_dim region_ds_path = self.location motif_ds = motif_set.scan_motifs( fasta_path=fasta_path, output_dir=region_ds_path, cpu=cpu, region_dim=region_dim, combine_cluster=combine_cluster, motif_dim=motif_dim, chunk_size=chunk_size, ) new_dataset_dim = {f'{region_dim}_{motif_dim}': region_dim} if combine_cluster: new_dataset_dim[f'{region_dim}_{motif_dim}-cluster'] = region_dim update_dataset_config(self.location, add_ds_region_dim=new_dataset_dim) self.update(motif_ds) return
[docs] def _scan_motifs_snakemake( self, fasta_path, output_dir, cpu, motif_dim="motif", combine_cluster=True, motif_set_path=None, fnr_fpr_fold=1000, chunk_size=50000 ): if chunk_size is None: chunk_size = 50000 from ALLCools.motif.snakemake import ( prepare_motif_scan_snakemake, check_snakemake_success, save_motif_chunks ) region_ds_path = output_dir snakemake_temp_dir = f'{region_ds_path}/motif_scan_snakemake' output_dir = snakemake_temp_dir region_dim = self.region_dim status_path = f'{output_dir}/status' if pathlib.Path(status_path).exists(): # check if all dir is executed successfully all_success = check_snakemake_success(output_dir) if all_success: print('Motif scan finished, merge chunks into RegionDS.') save_motif_chunks(motif_chunk_dir=output_dir, region_dim=region_dim, output_path=f'{region_ds_path}/{region_dim}_{motif_dim}', is_motif_cluster=False) if combine_cluster: save_motif_chunks(motif_chunk_dir=output_dir, region_dim=region_dim, output_path=f'{region_ds_path}/{region_dim}_{motif_dim}-cluster', is_motif_cluster=True) # update config new_dataset_dim = {f'{region_dim}_{motif_dim}': region_dim} if combine_cluster: new_dataset_dim[f'{region_dim}_{motif_dim}-cluster'] = region_dim update_dataset_config(self.location, add_ds_region_dim=new_dataset_dim) # load to current obj motif_ds = xr.open_zarr(f'{region_ds_path}/{region_dim}_{motif_dim}') self.update(motif_ds) if combine_cluster: motif_ds = xr.open_zarr(f'{region_ds_path}/{region_dim}_{motif_dim}-cluster') self.update(motif_ds) # finally, delete chunks shutil.rmtree(output_dir) return else: print('You can keep the success ones and ' 'rerun this function once the remaining chunks are executed successfully') return else: # prepare snakemake dir prepare_motif_scan_snakemake(output_dir=output_dir, fasta_path=fasta_path, region_dim=self.region_dim, motif_dim=motif_dim, motif_set_path=motif_set_path, chunk_size=chunk_size, combine_cluster=combine_cluster, fnr_fpr_fold=fnr_fpr_fold, cpu=cpu) print('Snakemake files are prepared, ' 'please execute snakemake commands for the actual motif scan. ' 'Once you executed everything, you may rerun this function with exact parameters to ' 'resume and store final results to RegionDS.') return
[docs] def get_hypo_hyper_index( self, a, region_dim=None, region_state_da=None, sample_dim="sample", use_collapsed=True, ): if region_state_da is None: if region_dim is None: region_dim = self.region_dim if use_collapsed: region_state_da = f"{region_dim}_state_collapsed" if not sample_dim.endswith("_collapsed"): sample_dim = f"{sample_dim}_collapsed" if region_state_da not in self.data_vars: print( f'use_collapsed=True but the "{region_dim}_state_collapsed" ' f"dataarray do not exist, provide region_state_da name explicitly " f"or set use_collapsed=False" ) region_state_da = f"{region_dim}_state" else: region_state_da = f"{region_dim}_state" a_states = self[region_state_da].sel({sample_dim: a}).to_pandas() hypo_dmr = a_states == -1 hypo_dmr = hypo_dmr[hypo_dmr].index hyper_dmr = a_states == 1 hyper_dmr = hyper_dmr[hyper_dmr].index return hypo_dmr, hyper_dmr
[docs] def get_pairwise_differential_index( self, a, b, dmr_type="hypo", region_dim=None, region_state_da=None, sample_dim="sample", use_collapsed=True, ): a_hypo, a_hyper = self.get_hypo_hyper_index( a, region_dim=region_dim, region_state_da=region_state_da, sample_dim=sample_dim, use_collapsed=use_collapsed, ) b_hypo, b_hyper = self.get_hypo_hyper_index( b, region_dim=region_dim, region_state_da=region_state_da, sample_dim=sample_dim, use_collapsed=use_collapsed, ) if dmr_type.lower() == "hypo" or dmr_type == -1: # a hypo, b not hypo a_not_b = a_hypo[~a_hypo.isin(b_hypo)] a_and_b = a_hypo[a_hypo.isin(b_hypo)] b_not_a = b_hypo[~b_hypo.isin(a_hypo)] elif dmr_type.lower() == "hyper" or dmr_type == 1: # a hypo, b not hypo a_not_b = a_hyper[~a_hyper.isin(b_hyper)] a_and_b = a_hyper[a_hyper.isin(a_hyper)] b_not_a = b_hyper[~b_hyper.isin(a_hyper)] else: raise ValueError(f"Unknown dmr_type {dmr_type}.") return a_not_b, a_and_b, b_not_a
[docs] def motif_enrichment( self, true_regions, background_regions, region_dim=None, motif_dim="motif-cluster", motif_da=None, alternative="two-sided", ): if region_dim is None: region_dim = self.region_dim if motif_da is None: motif_da = f"{region_dim}_{motif_dim}_da" true_motif = self[motif_da].sel( {"motif_value": "n_motifs", region_dim: true_regions} ) true_motif = (true_motif > 0).sum(dim=region_dim).to_pandas() true_no_motif = true_regions.size - true_motif bkg_motif = self[motif_da].sel( {"motif_value": "n_motifs", region_dim: background_regions} ) bkg_motif = (bkg_motif > 0).sum(dim=region_dim).to_pandas() bkg_no_motif = background_regions.size - bkg_motif contingency_tables = pd.DataFrame( {"tp": true_motif, "tn": true_no_motif, "fp": bkg_motif, "fn": bkg_no_motif} ) # add pseudo count contingency_tables += 1 test_results = contingency_tables.apply( _fisher_exact, axis=1, alternative=alternative ) from statsmodels.stats.multitest import multipletests _, q, *_ = multipletests(test_results["p"], method="fdr_bh") test_results["q"] = q test_results["log2OR"] = np.log2(test_results["oddsratio"]) test_results["-lgq"] = -np.log10(q) return test_results
[docs] def sample_dmr_motif_enrichment( self, sample, region_dim=None, sample_dim="sample", motif_dim="motif-cluster", region_state_da=None, motif_da=None, alternative="two-sided", use_collapsed=True, ): hypo_region, hyper_region = self.get_hypo_hyper_index( sample, region_dim=region_dim, region_state_da=region_state_da, sample_dim=sample_dim, use_collapsed=use_collapsed, ) test_results = self.motif_enrichment( hypo_region, hyper_region, region_dim=region_dim, motif_dim=motif_dim, motif_da=motif_da, alternative=alternative, ) return test_results
[docs] def pairwise_dmr_motif_enrichment( self, a, b, dmr_type="hypo", region_dim=None, region_state_da=None, sample_dim="sample", motif_dim="motif-cluster", motif_da=None, alternative="two-sided", ): a_not_b, _, b_not_a = self.get_pairwise_differential_index( a, b, dmr_type=dmr_type, region_dim=region_dim, region_state_da=region_state_da, sample_dim=sample_dim, ) test_results = self.motif_enrichment( a_not_b, b_not_a, region_dim=region_dim, motif_dim=motif_dim, motif_da=motif_da, alternative=alternative, ) return test_results
[docs] def object_coords_to_string(self, dtypes=None): obj_to_str(self, dtypes) return
[docs] def save(self, da_name=None, output_path=None, mode="w", change_region_dim=True): if output_path is None: if self.location is None: raise ValueError(f"RegionDS.location is None when trying to save.") pathlib.Path(self.location).mkdir(parents=True, exist_ok=True) output_path = f"{self.location}/{self.region_dim}" update_dataset_config(self.location, add_ds_region_dim={self.region_dim: self.region_dim}, change_region_dim=self.region_dim if change_region_dim else None) # turn object coords to fix length string dtype before saving to zarr if da_name is None: ds_to_save = self else: ds_to_save = self[[da_name]] ds_to_save.object_coords_to_string() ds_to_save.to_zarr(output_path, mode=mode) return
[docs] def get_coords(self, name): return self.coords[name].to_pandas()