Source code for ALLCools.sandbox.motif.ame

import pathlib
import shlex
import subprocess
from concurrent.futures import ProcessPoolExecutor, as_completed

import numpy as np
import pandas as pd

from .utilities import split_meme_motif_file
from ..dmr.get_fasta import get_fasta


[docs]def _single_ame_runner(fasta_path, motif_paths, ame_kws_str=''): if isinstance(motif_paths, list): motif_paths = ' '.join([str(i) for i in motif_paths]) cmd = f'ame {ame_kws_str} {fasta_path} {motif_paths} ' try: subprocess.run(shlex.split(cmd), check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf8') except subprocess.CalledProcessError as e: print(e.stderr) raise e return cmd
[docs]def ame(bed_file, motif_files, output_path, reference_fasta, background_files=None, cpu=1, standard_length=None, chrom_size_path=None, ame_kws=None, sample=None, seed=1): """\ Motif enrichment analysis with AME from MEME Suite. Parameters ---------- bed_file Single bed file input to calculate motif enrichment motif_files One or multiple motif files in MEME format output_path Output path of the AME result reference_fasta Reference fasta of the bed_file and background_files background_files One or multiple bed files contain region as the control set of motif enrichment cpu Number of CPUs to parallel AME standard_length If not None, will standard the input region and control region (if provided) in to same standard_length, each standardized region is centered to the original region center. chrom_size_path {chrom_size_path_doc} Must provide is standard_length is not None. ame_kws Additional options that will pass to AME, provide either a dict or string. If string provided, will directly insert into the final AME command, see AME documentation about AME options. e.g. '--seed 1 --method fisher --scoring avg' sample Sample input regions to this number to reduce run time or test seed Seed for random sample input regions, only apply when sample is not None Returns ------- """ # test related software try: subprocess.run(['ame', '--version'], check=True) except subprocess.CalledProcessError as e: print('Make sure AME from MEME suite is correctly installed.') raise e try: subprocess.run(['bedtools', '--version'], check=True) # used in get_fasta except subprocess.CalledProcessError as e: print('Make sure bedtools is correctly installed.') raise e output_path = str(output_path) output_dir = pathlib.Path(output_path + '_ame_temp') output_dir.mkdir(exist_ok=True) if isinstance(background_files, str): background_files = [background_files] # get bed FASTQ df = pd.read_csv(bed_file, sep='\t', header=None) print(df.shape[0], 'total regions') if sample is not None and sample <= df.shape[0]: df = df.sample(sample, random_state=seed) df.to_csv(output_dir / 'INPUT_BED_SAMPLED.bed', sep='\t', index=None, header=None) bed_file = output_dir / 'INPUT_BED_SAMPLED.bed' n_region = df.shape[0] print(n_region, 'input regions') bed_file = pathlib.Path(bed_file) output_fasta_path = output_dir / (bed_file.name + '.fasta') get_fasta([bed_file], fasta_path=reference_fasta, output_path=output_fasta_path, standard_length=standard_length, slop_b=None, chrom_size_path=chrom_size_path, cpu=cpu, merge=False, sort_mem_gbs=min(int(cpu * 3), 30)) # get merged background FASTQ if background_files is None: control_fasta = '--shuffle--' else: control_fasta = output_dir / 'CONTROL_REGIONS.fasta' get_fasta(background_files, fasta_path=reference_fasta, output_path=control_fasta, slop_b=None, standard_length=standard_length, chrom_size_path=chrom_size_path, cpu=cpu, merge=False, sample_region=min(100000, n_region), sort_mem_gbs=min(int(cpu * 3), 30)) # split motif files based on cpu ame_motif_temp_dir = output_dir / 'MOTIF_TEMP' ame_motif_temp_dir.mkdir(exist_ok=True) motif_records = split_meme_motif_file(motif_files, ame_motif_temp_dir) motif_path_chunks = [] step = min(len(motif_records) // cpu + 1, 10) for i in range(0, len(motif_records), step): # j is a list [motif_uid, motif_name, motif_path] # return by split_meme_motif_file motif_path_chunks.append([j[2] for j in motif_records[i:i + step]]) # configure ame options ame_kws_str = '' _ame_kws = {'noseq': '', 'verbose': 1, 'control': control_fasta} if isinstance(ame_kws, dict): _ame_kws.update(ame_kws) elif isinstance(ame_kws, str): ame_kws_str += ame_kws else: pass for k, v in _ame_kws.items(): ame_kws_str += f' --{k} {v}' # run single_bed_single_motif_runner with ProcessPoolExecutor(cpu) as pool: futures = [] for chunk_id, chunk in enumerate(motif_path_chunks): _ame_kws_str = ame_kws_str chunk_output_dir = output_dir / f'{chunk_id}_temp' _ame_kws_str += f' --oc {chunk_output_dir}' future = pool.submit(_single_ame_runner, fasta_path=output_fasta_path, motif_paths=chunk, ame_kws_str=_ame_kws_str) futures.append(future) for future in as_completed(futures): future.result() # merge results records = [] for i in range(len(motif_path_chunks)): ame_out_path = output_dir / f'{i}_temp/ame.tsv' # some rare cases AME output file has no header,I have no idea why # seems to be an AME bug. # So here I read the first line to check if the file has header with open(ame_out_path) as f: has_header = f.readline().startswith('rank') try: if has_header: chunk_df = pd.read_csv(ame_out_path, sep='\t', index_col=0, comment='#').reset_index(drop=True) else: chunk_df = pd.read_csv(ame_out_path, header=None, sep='\t', index_col=0, comment='#').reset_index(drop=True) chunk_df.columns = ['motif_DB', 'motif_ID', 'motif_alt_ID', 'consensus', 'p-value', 'adj_p-value', 'E-value', 'tests', 'FASTA_max', 'pos', 'neg', 'PWM_min', 'TP', '%TP', 'FP', '%FP'] chunk_df.index.name = 'rank' print(i, chunk_df.shape) except pd.errors.EmptyDataError: continue records.append(chunk_df) total_records = pd.concat(records, sort=True) total_records = total_records[total_records['motif_DB'].notna()] total_records = total_records[total_records['%TP'].notna()] total_records['E-value'] = total_records['E-value'].fillna(100).astype(float) total_records = total_records.sort_values('E-value').reset_index(drop=True) total_records['motif_DB'] = total_records['motif_DB'].apply( lambda x: x.split('/')[-1] if isinstance(x, str) else x) # add fold change total_records['TP/FP'] = (total_records['%TP'] + 0.0001) / (total_records['%FP'] + 0.0001) total_records['log2(TP/FP)'] = np.log2(total_records['TP/FP']) # save output total_records.to_csv(output_path, sep='\t') # clean temp dir subprocess.run(['rm', '-rf', output_dir], check=True) return total_records