Source code for ALLCools.reptile.reptile

import pathlib
import subprocess
import warnings
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, as_completed

import dask
import joblib
import numpy as np
import pandas as pd
import pyBigWig
import pybedtools
import xarray as xr
from sklearn.model_selection import train_test_split

from ALLCools.mcds import RegionDS
from ALLCools.mcds.utilities import write_ordered_chunks, update_dataset_config


[docs]def _create_train_region_ds(reptile): # train regions train_regions_bed = pybedtools.BedTool(reptile.train_regions).sort( g=reptile.chrom_size_path ) # train region labels train_label = pd.read_csv( reptile.train_region_labels, sep="\t", index_col=0, squeeze=True ) train_label.index.name = "train-region" train_regions_bed_df = train_regions_bed.to_dataframe() # train RegionDS train_region_ds = RegionDS.from_bed( train_regions_bed_df, chrom_size_path=reptile.chrom_size_path, location=reptile.output_path, region_dim="train-region", ) train_region_ds.coords["train-region_label"] = train_label train_region_ds.save() return train_regions_bed, train_label
[docs]def _create_train_dmr_ds(reptile, train_regions_bed, train_label): # total DMRs dmr_regions_bed = pybedtools.BedTool(reptile.dmr_regions).sort( g=reptile.chrom_size_path ) dmr_regions_bed_df = dmr_regions_bed.to_dataframe() # train DMRs and train DMR labels train_dmr = train_regions_bed.map(dmr_regions_bed, c=4, o="collapse").to_dataframe() dmr_label = defaultdict(list) for _, row in train_dmr.iterrows(): *_, train_region, dmrs = row if dmrs == ".": continue dmrs = dmrs.split(",") for dmr in dmrs: dmr_label[dmr].append(train_label[train_region]) # some DMR might have multiple labels consistent_dmr_label = {} for dmr, dmr_labels in dmr_label.items(): if (len(dmr_labels) == 1) or (len(set(dmr_labels)) == 1): consistent_dmr_label[dmr] = dmr_labels[0] else: # dmr has in consistent label continue dmr_label = pd.Series(consistent_dmr_label) dmr_label.index.name = "train-dmr" train_dmr_regions_bed_df = ( dmr_regions_bed_df.set_index("name") .loc[dmr_label.index] .reset_index() .iloc[:, [1, 2, 3, 0]] ) # train DMR RegionDS train_dmr_ds = RegionDS.from_bed( train_dmr_regions_bed_df, chrom_size_path=reptile.chrom_size_path, location=reptile.output_path, region_dim="train-dmr", ) train_dmr_ds.coords["train-dmr_label"] = dmr_label train_dmr_ds.save() return dmr_regions_bed_df
[docs]def _create_query_region_ds(reptile): pybedtools.BedTool().makewindows( g=reptile.chrom_size_path, s=reptile.step_size, w=reptile.window_size ).saveas(f"{reptile.output_path}/query_region.bed") query_region_ds = RegionDS.from_bed( f"{reptile.output_path}/query_region.bed", chrom_size_path=reptile.chrom_size_path, location=reptile.output_path, region_dim="query-region", ) subprocess.run(f"rm -f {reptile.output_path}/query_region.bed", shell=True) query_region_ds.save() return
[docs]def _create_query_dmr_ds(reptile, dmr_regions_bed_df): query_dmr_ds = RegionDS.from_bed( dmr_regions_bed_df, chrom_size_path=reptile.chrom_size_path, location=reptile.output_path, region_dim="query-dmr", ) query_dmr_ds.save() return
[docs]def _get_data_and_label(region_ds, modalities, sample, fillna_by_zero_list): data = {} for modality in modalities: da = region_ds[f"{region_ds.region_dim}_{modality}_da"] if modality in fillna_by_zero_list: da = da.fillna(0) sample_value = da.sel({modality: sample}).to_pandas() modality_mean = da.coords[f"{modality}_mean"] modality_std = da.coords[f"{modality}_std"] sample_dev = sample_value - modality_mean features = { f"{modality}_value": sample_value, f"{modality}_dev": sample_dev, f"{modality}_overall_std": modality_std, } data.update(features) data = pd.DataFrame(data) if region_ds.region_dim.startswith("train"): label = region_ds[f"{region_ds.region_dim}_label"].to_pandas() else: label = None return data, label
[docs]def _predict_sample( region_ds_path, region_dim, modalities, fillna_by_zero, sample, output_path, mask_cutoff=0.3, chunk_size=100000, ): # set dask scheduler to allow multiprocessing with dask.config.set(scheduler="sync"): if region_dim == "query-region": model = joblib.load(f"{region_ds_path}/model/train-region_model.lib") elif region_dim == "query-dmr": model = joblib.load(f"{region_ds_path}/model/train-dmr_model.lib") else: raise ValueError( f'Only accept ["query-region", "query-dmr"], got {region_dim}' ) # for query, we don't filter nan, but drop the nan value in final data table region_ds = RegionDS.open(region_ds_path, region_dim=region_dim) region_ids = region_ds.get_index(region_dim) total_proba = [] for chunk_start in range(0, region_ids.size, chunk_size): use_regions = region_ids[chunk_start: chunk_start + chunk_size] _region_ds = region_ds.sel({region_dim: use_regions}) data, _ = _get_data_and_label( region_ds=_region_ds, modalities=modalities, sample=sample, fillna_by_zero_list=fillna_by_zero, ) # before dropna, save the index total_index = data.index.copy() # sample specific NaN drop data.dropna(inplace=True) # predict proba = model.predict_proba(data.astype(np.float64)) enhancer_proba = pd.Series(proba[:, 1], index=data.index).reindex( total_index ) # NA value has 0 proba enhancer_proba.fillna(0, inplace=True) total_proba.append(enhancer_proba) total_proba = pd.DataFrame({sample: pd.concat(total_proba).astype(np.float16)}) # mask small values total_proba[total_proba < mask_cutoff] = 0 total_proba.index.name = region_ds.region_dim total_proba.columns.name = "sample" total_proba = xr.Dataset({f"{region_dim}_prediction": total_proba}) RegionDS(total_proba).to_zarr(output_path, mode="w") return output_path
[docs]def _call_enhancer_region(bw_path, dmr_bed, threshold, merge_dist, chrom_size_path): high_score_regions = [] with pyBigWig.open(bw_path) as bw: for chrom, length in bw.chroms().items(): intervals = bw.intervals(chrom) cur_start = None cur_end = None if intervals is None: continue for start, end, score in intervals: if score > threshold: # high score met, start or extend current enhancer if cur_start is None: # init cur_start = start cur_end = end else: # update end cur_end = end else: # small score met, check and save previous enhancer if cur_start is None: continue else: # save region high_score_regions.append([chrom, cur_start, cur_end]) cur_start = None cur_end = None # last region if cur_start is not None: high_score_regions.append([chrom, cur_start, cur_end]) high_score_regions = pd.DataFrame(high_score_regions, columns=['chrom', 'start', 'end']) # merge close enhancers, annotate DMR ids high_score_regions = pybedtools.BedTool \ .from_dataframe(high_score_regions) \ .sort(g=chrom_size_path) \ .merge(d=merge_dist) \ .map(b=dmr_bed, c=4, o='collapse', g=chrom_size_path) \ .to_dataframe() return high_score_regions
[docs]class REPTILE: def __init__( self, output_path, train_regions, dmr_regions, train_region_labels, train_sample, bigwig_table, chrom_size_path, window_size=2000, step_size=200, dmr_slop=150, fillna_by_zero=None, ): try: from tpot import TPOTClassifier except ImportError: raise ImportError('Got tpot import error, install the tpot package: \n' 'conda install -c conda-forge tpot xgboost dask dask-ml scikit-mdr skrebate \n' 'See also https://epistasislab.github.io/tpot/') self.output_path = output_path pathlib.Path(self.output_path).mkdir(exist_ok=True) # for reptile model self.model_dir = f"{self.output_path}/model" pathlib.Path(self.model_dir).mkdir(exist_ok=True) # for final prediction results self.bigwig_dir = f"{self.output_path}/bigwig" pathlib.Path(self.bigwig_dir).mkdir(exist_ok=True) # for final enhancer bed self.enhancer_dir = f"{self.output_path}/enhancer" pathlib.Path(self.enhancer_dir).mkdir(exist_ok=True) self.train_regions = train_regions self.dmr_regions = dmr_regions self.train_region_labels = train_region_labels self.train_sample = train_sample self.chrom_size_path = chrom_size_path self.window_size = window_size self.step_size = step_size self.dmr_slop = dmr_slop if fillna_by_zero is None: fillna_by_zero = [] self.fillna_by_zero = fillna_by_zero self.bigwig_table: pd.DataFrame if isinstance(bigwig_table, (str, pathlib.PosixPath)): bigwig_table = str(bigwig_table) if bigwig_table.endswith("tsv"): sep = "\t" else: sep = "," self.bigwig_table = pd.read_csv(bigwig_table, sep=sep, index_col=0) else: self.bigwig_table = bigwig_table self.modalities = self.bigwig_table.columns print(f"Got {self.modalities.size} modalities from bigwig_table: ", end="") print(", ".join(self.modalities)) if train_sample not in self.bigwig_table.index: raise KeyError( f"train_sample {train_sample} does not exist in the index of bigwig_table" ) self.samples = pd.Index( [s for s in self.bigwig_table.index if s != train_sample] ) print("Training sample:", self.train_sample) print("Other samples:", ", ".join(self.samples)) # four RegionDS self._train_region_ds = None self._train_dmr_ds = None self._query_region_ds = None self._query_dmr_ds = None self.region_dims = ["train-region", "train-dmr", "query-region", "query-dmr"] # two models self._region_model = None self._dmr_model = None # prediction results self._region_prediction = None self._dmr_prediction = None # check if default da exist, otherwise generate try: assert pathlib.Path(f"{self.output_path}/train-region").exists() assert pathlib.Path(f"{self.output_path}/train-dmr").exists() assert pathlib.Path(f"{self.output_path}/query-region").exists() assert pathlib.Path(f"{self.output_path}/query-dmr").exists() except AssertionError: self.generate_region_ds() return
[docs] def generate_region_ds(self): # RegionDS for training # step 1. Create training region RegionDS train_regions_bed, train_label = _create_train_region_ds(self) # step 2. Create DMR RegionDS overlapping with training regions dmr_regions_bed_df = _create_train_dmr_ds( self, train_regions_bed=train_regions_bed, train_label=train_label ) # RegionDS for query # step 3. Create all DMR RegionDS for query _create_query_dmr_ds(self, dmr_regions_bed_df=dmr_regions_bed_df) # step 4. Create all genome window (self.window_size, self.step) RegionDS for query _create_query_region_ds(self) return
@property
[docs] def train_region_ds(self): if self._train_region_ds is None: self._train_region_ds = RegionDS.open( self.output_path, region_dim="train-region", engine="zarr" ) return self._train_region_ds
@property
[docs] def train_dmr_ds(self): if self._train_dmr_ds is None: self._train_dmr_ds = RegionDS.open( self.output_path, region_dim="train-dmr", engine="zarr" ) return self._train_dmr_ds
@property
[docs] def query_region_ds(self): if self._query_region_ds is None: self._query_region_ds = RegionDS.open( self.output_path, region_dim="query-region", engine="zarr" ) return self._query_region_ds
@property
[docs] def query_dmr_ds(self): if self._query_dmr_ds is None: self._query_dmr_ds = RegionDS.open(self.output_path, region_dim="query-dmr") return self._query_dmr_ds
@property
[docs] def region_model(self): if self._region_model is None: try: self._region_model = joblib.load( f"{self.model_dir}/train-region_model.lib" ) except FileNotFoundError: print( "Region model not found, make sure you train the model before prediction" ) return self._region_model
@property
[docs] def dmr_model(self): if self._dmr_model is None: try: self._dmr_model = joblib.load(f"{self.model_dir}/train-dmr_model.lib") except FileNotFoundError: print( "DNR model not found, make sure you train the model before prediction" ) return self._dmr_model
[docs] def _validate_region_name(self, name): if name not in self.region_dims: raise KeyError(f"Only accept {self.region_dims}, got {name}")
[docs] def annotate_by_bigwigs(self, name, slop, cpu, redo=False): attribute_name = f"{name.replace('-', '_')}_ds" region_ds: RegionDS = self.__getattribute__(attribute_name) if region_ds.attrs.get("reptile_annotate") and not redo: print(f"{name} already annotated") return for modality, bigwig_paths in self.bigwig_table.items(): print(f"Annotating regions with {modality} BigWigs.") region_ds.annotate_by_bigwigs( bigwig_paths.to_dict(), dim=modality, slop=slop, # None for region, 150 for DMR value_type="mean", chunk_size="auto", dtype="float32", cpu=cpu, ) da_name = f"{region_ds.region_dim}_{modality}_da" if modality in self.fillna_by_zero: region_ds[da_name] = region_ds[da_name].fillna(0) with warnings.catch_warnings(): warnings.simplefilter("ignore") region_ds.coords[f"{modality}_mean"] = ( region_ds[da_name] .sel({modality: self.samples}) .mean(dim=modality, skipna=True) .load() ) region_ds.coords[f"{modality}_std"] = ( region_ds[da_name] .sel({modality: self.samples}) .std(dim=modality, skipna=True) .load() ) region_ds.coords[f"{modality}_nan_count"] = ( np.isnan(region_ds[da_name].sel({modality: self.samples})) .sum(dim=modality) .load() ) # save the annotated region_ds to self.{output_dir}/{region_dim} region_ds.attrs["reptile_annotate"] = True region_ds.save(mode="a") for modality in self.modalities: subprocess.run(f"rm -rf {self.output_path}/{name}_{modality}", shell=True) # need to re-open region ds otherwise the annotated da is missing # will reopen when access this attr self.__setattr__(f"_{attribute_name}", None) return
[docs] def _filter_na_train(self, name, sample, max_na_rate=0.5): self._validate_region_name(name) attribute_name = f"{name.replace('-', '_')}_ds" region_ds: RegionDS = self.__getattribute__(attribute_name) for modality in self.modalities: if modality not in region_ds.dims: raise KeyError( f"{name} RegionDS missing {modality} annotation, " f"make sure you run REPTILE.annotate_by_bigwigs before training." ) judges = [] for data_var, da in region_ds.data_vars.items(): modality = data_var.split("_")[1] if modality in self.fillna_by_zero: # for count based data, nan means 0 count, no need to filter continue # remove regions having no coverage in training sample nan_sample = np.isnan(da.sel({modality: sample})).to_pandas() # or having <25% samples covered in other samples nan_other = da.coords[f"{modality}_nan_count"].to_pandas() nan_other = nan_other > (len(self.samples) * max_na_rate) remove_feature = nan_sample | nan_other judges.append(remove_feature) if len(judges) > 0: remove_feature = np.any(judges, axis=0) region_ds = region_ds.sel({region_ds.region_dim: ~remove_feature}) print( f"{(~remove_feature).sum()} features remained after filter NaN for {name}" ) else: # this means all modality NaN filled by 0 pass return region_ds
[docs] def prepare_training_input(self, name): if name not in self.region_dims: raise ValueError(f'Only accept ["train-region", "train-dmr"], got {name}') # get the RegionDS that with NaN values filtered (see self._filter_na) region_ds = self._filter_na_train( name=name, sample=self.train_sample, max_na_rate=0.5 ) data, label = _get_data_and_label( region_ds=region_ds, modalities=self.modalities, sample=self.train_sample, fillna_by_zero_list=self.fillna_by_zero, ) return data, label
@staticmethod
[docs] def auto_ml( data, label, output_path, train_size=0.75, random_state=42, cpu=1, tpot_generations=5, tpot_max_time_mins=60, **tpot_kwargs, ): print("Training model with these parameters:") print(f"cpu={cpu}") print(f"train_size={train_size}") print(f"random_state={random_state}") print(f"generations={tpot_generations}") print(f"max_time_mins={tpot_max_time_mins}") for k, v in tpot_kwargs.items(): print(f"{k}={v}") x_train, x_test, y_train, y_test = train_test_split( data.astype(np.float64), label.astype(np.float64), train_size=train_size, test_size=1 - train_size, random_state=random_state, ) from tpot import TPOTClassifier _tpot = TPOTClassifier( generations=tpot_generations, max_time_mins=tpot_max_time_mins, verbosity=2, n_jobs=cpu, random_state=random_state, **tpot_kwargs, ) _tpot.fit(x_train, y_train) final_score = _tpot.score(x_test, y_test) print(f"Final hold-out testing data accuracy: {final_score:.4f}") print("Final pipeline:") print(_tpot.fitted_pipeline_) # save the model for prediction joblib.dump(_tpot.fitted_pipeline_, output_path) return
[docs] def _train(self, region_dim, slop, cpu, **kwargs): # step 1: annotate RegionDS self.annotate_by_bigwigs(region_dim, slop=slop, cpu=cpu) # step 2: prepare data matrix and filter nan data, label = self.prepare_training_input(region_dim) # step 3: perform AutoML training and get the final model model_output_path = f"{self.model_dir}/{region_dim}_model.lib" # add cpu otherwise auto_ml don't have this parameter kwargs["cpu"] = cpu self.auto_ml(data=data, label=label, output_path=model_output_path, **kwargs) return model_output_path
[docs] def train_region_model(self, slop=None, cpu=1, **kwargs): region_dim = "train-region" print("Training model for genomic regions.") model_path = self._train(region_dim, slop, cpu=cpu, **kwargs) self._region_model = joblib.load(model_path) return
[docs] def train_dmr_model(self, slop=None, cpu=1, **kwargs): region_dim = "train-dmr" print("Training model for DMR regions.") if slop is None: slop = self.dmr_slop print(f"Using slop {slop} for DMRs") model_path = self._train(region_dim, slop=slop, cpu=cpu, **kwargs) self._dmr_model = joblib.load(model_path) return
[docs] def fit(self, cpu=10, **kwargs): """Convenient function to train everything by default parameters""" self.train_region_model(cpu=cpu, **kwargs) self.train_dmr_model(cpu=cpu, **kwargs) return
[docs] def _predict(self, region_dim, cpu, mask_cutoff): if region_dim == "query-region": model = self.region_model slop = None else: model = self.dmr_model slop = self.dmr_slop print(f"Perform {region_dim} prediction using fitted model:") print(model) # step 1: annotate RegionDS self.annotate_by_bigwigs(region_dim, slop=slop, cpu=cpu) # step 2: prepare input and run prediction chunk_dir = f"{self.output_path}/.{region_dim}_prediction_chunks" pathlib.Path(chunk_dir).mkdir(exist_ok=True) final_dir = f"{self.output_path}/{region_dim}_prediction" with ProcessPoolExecutor(cpu) as exe: futures = {} for chunk_i, sample in enumerate(self.samples): sample_output_path = f"{chunk_dir}/{sample}.zarr" future = exe.submit( _predict_sample, region_ds_path=self.output_path, region_dim=region_dim, modalities=self.modalities, fillna_by_zero=self.fillna_by_zero, sample=sample, output_path=sample_output_path, chunk_size=500000, mask_cutoff=mask_cutoff, ) futures[future] = chunk_i chunks_to_write = {} for future in as_completed(futures): chunks_to_write[futures[future]] = future.result() write_ordered_chunks( chunks_to_write, final_path=final_dir, append_dim="sample", engine="zarr", coord_dtypes=None, dtype=None, ) final_da = xr.open_zarr(final_dir) subprocess.run(f"rm -rf {chunk_dir}", shell=True) update_dataset_config(output_dir=self.output_path, add_ds_region_dim={f"{region_dim}_prediction": region_dim}, change_region_dim=None) if region_dim == "query-region": self._region_prediction = final_da else: self._dmr_prediction = final_da return
[docs] def predict(self, cpu, mask_cutoff=0.3, bw_bin_size=10, enhancer_cutoff=0.7): self._predict(region_dim="query-region", cpu=cpu, mask_cutoff=mask_cutoff) self._predict(region_dim="query-dmr", cpu=cpu, mask_cutoff=mask_cutoff) # loading all region ids is quite memory intensive # besides, this step is not time-consuming self.dump_bigwigs(cpu=min(cpu, 5), mask_cutoff=mask_cutoff, bw_bin_size=bw_bin_size) # call enhancer with enhancer_cutoff, merge enhancer within one step size self.call_enhancers(threshold=enhancer_cutoff, merge_dist=self.step_size) return
[docs] def _dump_sample(self, sample, mask_cutoff, bw_bin_size): # set dask scheduler to allow multiprocessing with dask.config.set(scheduler="sync"): dmr_pred = RegionDS.open(self.output_path, region_dim="query-dmr") region_pred = RegionDS.open(self.output_path, region_dim="query-region") # save DMR prediction proba dmr_bed_df = dmr_pred.get_bed(with_id=False) dmr_value = dmr_pred.get_feature( sample, dim="sample", da_name="query-dmr_prediction" ) dmr_bed_df["score"] = dmr_value dmr_bed_df = dmr_bed_df[dmr_bed_df["score"] > mask_cutoff].copy() dmr_bed_df.sort_values(["chrom", "start"], inplace=True) dmr_bed_df.to_csv( f"{self.bigwig_dir}/{sample}_dmr_pred.bg", sep="\t", index=None, header=None, ) # save region prediction proba region_bed_df = region_pred.get_bed(with_id=False) region_value = region_pred.get_feature( sample, dim="sample", da_name="query-region_prediction" ) region_bed_df["score"] = region_value region_bed_df = region_bed_df[region_bed_df["score"] > mask_cutoff].copy() region_bed_df.sort_values(["chrom", "start"], inplace=True) region_bed_df.to_csv( f"{self.bigwig_dir}/{sample}_region_pred.bg", sep="\t", index=None, header=None, ) # use bedtools unionbedg to aggregate scores subprocess.run( f"bedtools unionbedg -i " f"{self.bigwig_dir}/{sample}_dmr_pred.bg " f"{self.bigwig_dir}/{sample}_region_pred.bg > " f"{self.bigwig_dir}/{sample}_unionbedg.tsv", shell=True, check=True) union_table = pd.read_csv(f'{self.bigwig_dir}/{sample}_unionbedg.tsv', sep='\t', header=None) bw_path = f"{self.bigwig_dir}/{sample}_reptile_score.bw" with pyBigWig.open(bw_path, "w") as bw: chrom_sizes = pd.read_csv( self.chrom_size_path, sep="\t", index_col=0, header=None, squeeze=True, ).sort_index() bw.addHeader( [(k, v) for k, v in chrom_sizes.items()] ) for chrom in chrom_sizes.index: cur_bin = 0 cur_scores = [0] for _, row in union_table[union_table[0] == chrom].iterrows(): chrom, start, end, *scores = row score = max(map(float, scores)) start_bin = int(start) // bw_bin_size end_bin = int(end) // bw_bin_size + 1 for bin_id in range(start_bin, end_bin): if bin_id > cur_bin: # save previous bin cur_pos = cur_bin * bw_bin_size mean_score = sum(cur_scores) / len(cur_scores) try: if mean_score > 0.01: bw.addEntries( chrom, [cur_pos], values=[mean_score], span=bw_bin_size, ) except RuntimeError as e: print(chrom, cur_pos, mean_score, bw_bin_size) raise e # init new bin cur_bin = bin_id cur_scores = [score] elif bin_id == cur_bin: # the same bin, take average cur_scores.append(score) else: # no score, initial state pass # final cur_pos = cur_bin * bw_bin_size mean_score = sum(cur_scores) / len(cur_scores) try: if mean_score > 0.01: bw.addEntries( chrom, [cur_pos], values=[mean_score], span=bw_bin_size ) except RuntimeError as e: print(chrom, cur_pos, mean_score, bw_bin_size) raise e subprocess.run( f"rm -f {self.bigwig_dir}/{sample}_dmr_pred.bg " f"{self.bigwig_dir}/{sample}_region_pred.bg " f"{self.bigwig_dir}/{sample}_unionbedg.tsv", shell=True, ) return bw_path
[docs] def dump_bigwigs(self, cpu, mask_cutoff, bw_bin_size): print( f"Save prediction results to bigwig files: \n" f"bin size {bw_bin_size}, minimum score {mask_cutoff}." ) with ProcessPoolExecutor(cpu) as exe: futures = {} for sample in self.samples: f = exe.submit( self._dump_sample, sample=sample, mask_cutoff=mask_cutoff, bw_bin_size=bw_bin_size, ) futures[f] = sample for f in as_completed(futures): sample = futures[f] print(f"{sample} result dump to: ", end="") bw_path = f.result() print(bw_path) return
[docs] def call_enhancers(self, threshold=0.7, merge_dist=None): if merge_dist is None: merge_dist = self.step_size print(f'Call final enhancer with threshold {threshold}, merge enhancer within {merge_dist}bp') dmr_bed = pybedtools.BedTool(self.dmr_regions).sort(g=self.chrom_size_path) for sample in self.samples: bw_path = f"{self.bigwig_dir}/{sample}_reptile_score.bw" enhancer_df = _call_enhancer_region(bw_path=bw_path, dmr_bed=dmr_bed, threshold=threshold, merge_dist=merge_dist, chrom_size_path=self.chrom_size_path) sample_out_path = f"{self.enhancer_dir}/{sample}_enhancer.bed" enhancer_df.to_csv(sample_out_path, sep='\t', index=None, header=None) print(f'Sample {sample} has {enhancer_df.shape[0]} enhancers.') return