Source code for ALLCools.clustering.pvclust

import pandas as pd
import numpy as np
from scipy.cluster.hierarchy import dendrogram
import matplotlib.pyplot as plt
import joblib


[docs]def install_r_package(name): from rpy2.robjects.vectors import StrVector from rpy2.robjects.packages import importr, isinstalled if not isinstalled(name): utils = importr("utils") utils.chooseCRANmirror(ind=1) utils.install_packages(StrVector([name]))
[docs]def _hclust_to_scipy_linkage(result, plot=True): """Turn R hclust result obj into scipy linkage matrix format""" # in hclust merge matrix, negative value is for singleton raw_linkage = pd.DataFrame(np.array(result[0])) nobs = raw_linkage.shape[0] + 1 raw_linkage[2] = np.array(result[1]) raw_linkage.index = raw_linkage.index + nobs # in hclust merge matrix, positive value is for non-singleton scipy_linkage = raw_linkage.copy() scipy_linkage[raw_linkage.iloc[:, :2] < 0] += nobs scipy_linkage[raw_linkage.iloc[:, :2] > 0] += nobs - 1 total_obs = nobs # add the 4th col: number of singleton cluster_dict = {} labels = list(range(total_obs)) for cur_cluster_id, (left, right, distance) in scipy_linkage.iterrows(): left = int(left) right = int(right) cluster_dict[cur_cluster_id] = {"left": set(), "right": set()} if (left < total_obs) and (right < total_obs): left = labels[left] right = labels[right] # merge of 2 original observations cluster_dict[cur_cluster_id]["left"].add(left) cluster_dict[cur_cluster_id]["right"].add(right) else: # left and/or right are cluster if left < total_obs: left = labels[left] cluster_dict[cur_cluster_id]["left"].add(left) else: # node are cluster cluster_dict[cur_cluster_id]["left"].update(cluster_dict[left]["left"]) cluster_dict[cur_cluster_id]["left"].update(cluster_dict[left]["right"]) if right < total_obs: right = labels[right] cluster_dict[cur_cluster_id]["right"].add(right) else: # node are cluster cluster_dict[cur_cluster_id]["right"].update( cluster_dict[right]["left"] ) cluster_dict[cur_cluster_id]["right"].update( cluster_dict[right]["right"] ) cur_cluster_id += 1 cluster_records = {} for cluster, _sub_dict in cluster_dict.items(): total_n = len(_sub_dict["left"]) + len(_sub_dict["right"]) cluster_records[cluster] = total_n scipy_linkage[3] = pd.Series(cluster_records) # dendrogram orders = list(result[2]) labels = list(result[3]) # correct order of the final dendrogram r_order = [labels[i - 1] for i in orders] dendro = dendrogram(scipy_linkage.values, no_plot=True) python_order = ( pd.Series({a: b for a, b in zip(dendro["leaves"], r_order)}) .sort_index() .tolist() ) # python_order = [i[1:] for i in python_order] if plot: fig, ax = plt.subplots(dpi=300) dendro = dendrogram( scipy_linkage.values, labels=tuple(python_order), no_plot=False, ax=ax ) ax.xaxis.set_tick_params(rotation=90) else: dendro = dendrogram( scipy_linkage.values, labels=tuple(python_order), no_plot=True ) return scipy_linkage, python_order, dendro
[docs]class Dendrogram: def __init__( self, nboot=1000, method_dist="correlation", method_hclust="average", n_jobs=-1 ): self.nboot = nboot self.method_dist = method_dist self.method_hclust = method_hclust self.n_jobs = n_jobs self.linkage = None self.label_order = None self.dendrogram = None self.edge_stats = None
[docs] def fit(self, data): """ Parameters ---------- data The data is in obs-by-var form, row is obs. Returns ------- """ try: import rpy2.robjects as ro from rpy2.robjects import pandas2ri from rpy2.robjects.conversion import localconverter from rpy2.robjects.packages import importr except ImportError: raise ImportError('Got rpy2 import error, please make sure R, rpy2 and their dependencies are installed. ' 'If not, use conda or mamba to install rpy2') importr("base") install_r_package("pvclust") pvclust = importr("pvclust") with localconverter(ro.default_converter + pandas2ri.converter): # The data is in obs-by-var form, row is obs. Transpose for R. r_df = ro.conversion.py2rpy(data.T) if self.n_jobs == -1: self.n_jobs = True result = pvclust.pvclust( r_df, nboot=self.nboot, method_dist=self.method_dist, method_hclust=self.method_hclust, parallel=self.n_jobs, ) # dendrogram info hclust = result[0] linkage, label_order, dendro = _hclust_to_scipy_linkage(hclust, plot=False) self.linkage = linkage self.label_order = label_order self.dendrogram = dendro # scores of edges by pvclust bootstrap edge_stats = pd.DataFrame(result[1], index=result[1].colnames).T edge_stats.index = linkage.index child_dict = {} # pvclust edge stat is only for parents, here turn it into child basis for parent, (left, right, *_) in linkage.iterrows(): child_dict[int(left)] = edge_stats.loc[parent] child_dict[int(right)] = edge_stats.loc[parent] self.edge_stats = pd.DataFrame(child_dict).T.sort_index() return
[docs] def save(self, output_path): joblib.dump(self, output_path)