from Bio import motifs, SeqIO
from concurrent.futures import ProcessPoolExecutor, as_completed
import pandas as pd
import numpy as np
import xarray as xr
import pathlib
import subprocess
from ..mcds.utilities import write_ordered_chunks
from ..mcds import RegionDS
[docs]def _get_motif_threshold(motif, method, threshold_value):
if method == "patser":
value = motif.pssm.distribution().threshold_patser()
elif method == "balanced":
value = motif.pssm.distribution().threshold_balanced(threshold_value)
elif method == "fpr":
value = motif.pssm.distribution().threshold_fpr(threshold_value)
elif method == "fnr":
value = motif.pssm.distribution().threshold_fnr(threshold_value)
else:
raise ValueError(
f'Method needs to be ["patser", "balanced", "fpr", "fnr"], got {method}. '
f"Check this page for more details: https://biopython-tutorial.readthedocs.io/en/latest/"
f"notebooks/14%20-%20Sequence%20motif%20analysis%20using%20Bio.motifs.html"
f"#Selecting-a-score-threshold"
)
return value
[docs]def _single_seq_single_motif_max_score(seq, motif, threshold):
if len(seq) < motif.length:
return 0, 0, 0
# position and direction information can be collected in motif.pssm.search
# but not used here
scores = [i[1] for i in motif.pssm.search(seq.seq, threshold=threshold)]
n_sites = len(scores)
if n_sites == 0:
max_score = 0
total_score = 0
else:
max_score = max(scores)
total_score = sum(scores)
return n_sites, max_score, total_score
[docs]class MotifSet:
def __init__(self, motifs, meta_table=None, motif_cluster_col="cluster_id"):
self.motif_list = motifs
self.motif_dict = {motif.name: motif for motif in self.motif_list}
self.motif_name_list = [motif.name for motif in self.motif_list]
self.n_motifs = len(self.motif_list)
self.thresholds = {}
# meta table index is not unique, because one motif may corresponding to multiple gene
self.meta_table = meta_table
# add motif cluster and remove duplicates
if meta_table is not None:
self.motif_cluster = pd.Series(meta_table[motif_cluster_col].to_dict())
self.motif_cluster.index.name = "motif"
[docs] def calculate_threshold(self, method="balanced", cpu=1, threshold_value=1000):
# execute in parallel
with ProcessPoolExecutor(cpu) as exe:
futures = {}
for motif in self.motif_list:
f = exe.submit(
_get_motif_threshold,
method=method,
motif=motif,
threshold_value=threshold_value,
)
futures[f] = motif.name
count = 0
print("Calculating motif threshold: ", end="")
for f in as_completed(futures):
threshold = f.result()
name = futures[f]
self.thresholds[name] = threshold
count += 1
if count % 250 == 0:
print(count, end=" ")
print()
return
[docs] def _multi_seq_motif_scores(
self, seq_list, save_path, region_dim, motif_dim, dtype
):
seq_names = []
seq_scores = []
for seq in seq_list:
this_seq_scores = []
for motif in self.motif_list:
threshold = self.thresholds[motif.name]
n_max_total_score = _single_seq_single_motif_max_score(
seq=seq, motif=motif, threshold=threshold
)
this_seq_scores.append(n_max_total_score)
seq_scores.append(this_seq_scores)
seq_names.append(seq.name)
seq_scores = np.round(seq_scores)
max_value = np.iinfo(dtype).max
seq_scores[seq_scores > max_value] = max_value
_data = np.array(seq_scores).astype(dtype)
da = xr.DataArray(
_data,
coords=[
seq_names,
self.motif_name_list,
["n_motifs", "max_score", "total_score"],
],
dims=[region_dim, motif_dim, "motif_value"],
)
motif_score_matrix = xr.Dataset({f"{region_dim}_{motif_dim}_da": da})
motif_score_matrix.to_zarr(save_path)
return save_path
[docs] def _run_motif_scan_chunks(
self,
fasta_path,
output_dir,
region_dim,
motif_dim,
dtype,
chunk_size=10000,
cpu=1,
):
with open(fasta_path) as fasta:
seqs = []
for seq in SeqIO.parse(fasta, "fasta"):
seqs.append(seq)
print(f"Scan {self.n_motifs} motif in {len(seqs)} sequences.")
if chunk_size is None:
chunk_size = min(10000, max(10, len(seqs) // cpu))
output_dir = pathlib.Path(output_dir).absolute()
final_dir = f"{output_dir}/{region_dim}_{motif_dim}"
chunk_dir = f"{output_dir}/.{region_dim}_{motif_dim}_chunk"
if pathlib.Path(chunk_dir).exists():
subprocess.run(f"rm -rf {chunk_dir}", shell=True)
pathlib.Path(chunk_dir).mkdir(parents=True)
with ProcessPoolExecutor(cpu) as exe:
futures = {}
for chunk_i, chunk_start in enumerate(range(0, len(seqs), chunk_size)):
save_path = f"{chunk_dir}/motif_{chunk_i}.zarr"
_seqs = seqs[chunk_start: chunk_start + chunk_size]
future = exe.submit(
self._multi_seq_motif_scores,
seq_list=_seqs,
save_path=str(save_path),
region_dim=region_dim,
motif_dim=motif_dim,
dtype=dtype,
)
futures[future] = chunk_i
chunks_to_write = {}
for i, future in enumerate(as_completed(futures)):
print(f"Job {i} returned")
chunk_i = futures[future]
output_path = future.result()
chunks_to_write[chunk_i] = output_path
write_ordered_chunks(
chunks_to_write,
final_path=final_dir,
append_dim=region_dim,
engine="zarr",
coord_dtypes=None,
)
subprocess.run(f"rm -rf {chunk_dir}", shell=True)
return
[docs] def _aggregate_motif_clusters(
self, output_dir, motif_dim, region_dim, dtype, chunk_size=1000000
):
motif_ds = RegionDS.open(
f"{output_dir}/{region_dim}_{motif_dim}/", region_dim=region_dim
)
_motif_cluster = self.motif_cluster
_motif_cluster.name = motif_dim
motif_ds.coords[f"{motif_dim}-cluster"] = _motif_cluster
motif_cluster_ds = motif_ds.groupby(f"{motif_dim}-cluster").apply(
lambda _i: _i.max(dim=motif_dim).astype(dtype)
)
# save motif cluster matrix by chunk
motif_cluster_ds = RegionDS(motif_cluster_ds)
cluster_output_dir = f"{output_dir}/{region_dim}_{motif_dim}-cluster"
for i, chunk in enumerate(
motif_cluster_ds.iter_array(
dim=region_dim, chunk_size=chunk_size, da="dmr_motif_da", load=True
)
):
_ds = xr.Dataset({"dmr_motif-cluster_da": chunk}).load()
if i == 0:
_ds.to_zarr(cluster_output_dir, mode="w")
else:
_ds.to_zarr(cluster_output_dir, append_dim=region_dim)
return
[docs] def scan_motifs(
self,
fasta_path,
output_dir,
cpu,
region_dim,
motif_dim="motif",
combine_cluster=True,
dtype="uint16",
chunk_size=10000,
):
self._run_motif_scan_chunks(
fasta_path=fasta_path,
output_dir=output_dir,
cpu=cpu,
region_dim=region_dim,
motif_dim=motif_dim,
dtype=dtype,
chunk_size=chunk_size,
)
motif_ds = RegionDS.open(
f"{output_dir}/{region_dim}_{motif_dim}", region_dim=region_dim
)
if combine_cluster:
self._aggregate_motif_clusters(
output_dir=output_dir,
motif_dim=motif_dim,
region_dim=region_dim,
dtype=dtype,
chunk_size=chunk_size,
)
motif_cluster_ds = RegionDS.open(
f"{output_dir}/{region_dim}_{motif_dim}-cluster", region_dim=region_dim
)
motif_ds.update(motif_cluster_ds)
return motif_ds
[docs]class Motif(motifs.Motif):
def __init__(self, alphabet="ACGT", instances=None, counts=None):
super().__init__(alphabet=alphabet, instances=instances, counts=counts)