Source code for ALLCools.sandbox.balanced_knn

import logging
from typing import Tuple, Any

import numpy as np
from numba import jit
from pynndescent import NNDescent
from scipy import sparse
from sklearn.neighbors import NearestNeighbors as _NearestNeighbors


@jit(
    ["float32(float64[:], float64[:])", "float32(float32[:], float32[:])"],
    nopython=True,
    cache=True,
[docs]) def jensen_shannon_divergence(pk: np.ndarray, qk: np.ndarray) -> float: N = pk.shape[0] # pk = pk / np.sum(pk) # qk = qk / np.sum(qk) m = (pk + qk) / 2 vec = np.zeros(N) for i in range(N): if pk[i] > 0 and m[i] > 0: vec[i] = pk[i] * np.log(pk[i] / m[i]) elif pk[i] == 0 and m[i] >= 0: vec[i] = 0 else: vec[i] = np.inf Dpm = np.sum(vec) / np.log(2) vec = np.zeros(N) for i in range(N): if qk[i] > 0 and m[i] > 0: vec[i] = qk[i] * np.log(qk[i] / m[i]) elif qk[i] == 0 and m[i] >= 0: vec[i] = 0 else: vec[i] = np.inf Dqm = np.sum(vec) / np.log(2) return (Dpm + Dqm) / 2
@jit( ["float32(float64[:], float64[:])", "float32(float32[:], float32[:])"], nopython=True, cache=True,
[docs]) def jensen_shannon_distance(pk: np.ndarray, qk: np.ndarray) -> float: """ Remarks: pk and qk must already be normalized so that np.sum(pk) == np.sum(qk) == 1 """ return np.sqrt(jensen_shannon_divergence(pk, qk))
@jit(nopython=True)
[docs]def balance_knn_loop( dsi: np.ndarray, dist: np.ndarray, lsi: np.ndarray, maxl: int, k: int ) -> Tuple: """Fast and greedy algorythm to balance a K-NN graph so that no node is the NN to more than maxl other nodes Arguments --------- dsi : np.ndarray (samples, K) distance sorted indexes (as returned by sklearn NN) dist : np.ndarray (samples, K) the actual distance corresponding to the sorted indexes lsi : np.ndarray (samples,) degree of connectivity (l) sorted indexes maxl : int max degree of connectivity (from others to self) allowed k : int number of neighbours in the final graph return_distance : bool wether to return distance Returns ------- dsi_new : np.ndarray (samples, k+1) indexes of the NN, first column is the sample itself dist_new : np.ndarray (samples, k+1) distances to the NN l: np.ndarray (samples) l[i] is the number of connections from other samples to the sample i """ assert dsi.shape[1] >= k, "sight needs to be bigger than k" # numba signature "Tuple((int64[:,:], float32[:, :], int64[:]))(int64[:,:], int64[:], int64, int64, bool)" dsi_new = -1 * np.ones((dsi.shape[0], k + 1), np.int64) # maybe d.shape[0] l = np.zeros(dsi.shape[0], np.int64) dist_new = np.zeros(dsi_new.shape, np.float64) for i in range(dsi.shape[0]): # For every node el = lsi[i] # start from high degree of connectivity p = 0 j = 0 for j in range(dsi.shape[1]): # For every other node it is connected (sight) if p >= k: break m = dsi[el, j] if el == m: dsi_new[el, 0] = el continue if l[m] >= maxl: # skip the node if it is already connected with maxl nodes continue dsi_new[el, p + 1] = m l[m] = l[m] + 1 dist_new[el, p + 1] = dist[el, j] p += 1 if (j == dsi.shape[1] - 1) and (p < k): while p < k: dsi_new[el, p + 1] = el dist_new[el, p + 1] = dist[el, 0] p += 1 return dist_new, dsi_new, l
[docs]def knn_balance( dsi: np.ndarray, dist: np.ndarray = None, maxl: int = 200, k: int = 60 ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Balance a K-NN graph so that no node is the NN to more than maxl other nodes Arguments --------- dsi : np.ndarray (samples, K) distance sorted indexes (as returned by sklearn NN) dist : np.ndarray (samples, K) the actual distance corresponding to the sorted indexes maxl : int max degree of connectivity allowed k : int number of neighbours in the final graph Returns ------- dist_new : np.ndarray (samples, k+1) distances to the NN dsi_new : np.ndarray (samples, k+1) indexes of the NN, first column is the sample itself l: np.ndarray (samples) l[i] is the number of connections from other samples to the sample i """ l = np.bincount(dsi.flat[:], minlength=dsi.shape[0]) # degree of connectivity lsi = np.argsort(l, kind="mergesort")[ ::-1 ] # element index sorted by descending degree of connectivity return balance_knn_loop(dsi, dist, lsi, maxl, k)
[docs]class NearestNeighbors: """Greedy algorithm to balance a K-nearest neighbour graph It has an API similar to scikit-learn Parameters ---------- k : int (default=50) the number of neighbours in the final graph sight_k : int (default=100) the number of neighbours in the initialization graph It correspondent to the farthest neighbour that a sample is allowed to connect to when no closest neighbours are allowed. If sight_k is reached then the matrix is filled with the sample itself maxl : int (default=200) max degree of connectivity allowed. Avoids the presence of hubs in the graph, it is the maximum number of neighbours that are allowed to contact a node before the node is blocked mode : str (default="connectivity") decide wich kind of utput "distance" or "connectivity" n_jobs : int (default=4) parallelization of the standard KNN search preformed at initialization """ def __init__( self, k: int = 50, sight_k: int = 100, maxl: int = 200, mode: str = "distance", metric: str = "euclidean", minkowski_p: int = 20, n_jobs: int = -1, ) -> None: # input parameters self.k = k self.sight_k = sight_k self.maxl = maxl self.mode = mode self.metric = metric self.minkowski_p = minkowski_p self.n_jobs = n_jobs # NN graphs self.data = None self._nn = None # raw KNN self.bknn = None # balanced KNN self.dist = None # balanced KNN distances self.dsi = None # balanced KNN neighbor index self.l = None # balanced KNN degree of connectivity self.mknn = None # mutual KNN based on bknn self.rnn = None # radius NN based on mknn @property
[docs] def n_samples(self) -> int: return self.data.shape[0]
[docs] def fit(self, data: np.ndarray, sight_k: int = None) -> Any: """Fits the model data: np.ndarray (samples, features) np sight_k: int the farthest point that a node is allowed to connect to when its closest neighbours are not allowed """ self.data = data if sight_k is not None: self.sight_k = sight_k logging.debug( f"First search the {self.sight_k} nearest neighbours for {self.n_samples}" ) np.random.seed(13) if self.metric == "correlation": self._nn = _NearestNeighbors( n_neighbors=self.sight_k + 1, metric=self.metric, p=self.minkowski_p, n_jobs=self.n_jobs, algorithm="brute", ) self._nn.fit(self.data) elif self.metric == "js": self._nn = NNDescent(data=self.data, metric=jensen_shannon_distance) else: self._nn = _NearestNeighbors( n_neighbors=self.sight_k + 1, metric=self.metric, p=self.minkowski_p, n_jobs=self.n_jobs, leaf_size=30, ) self._nn.fit(self.data) # call this to calculate bknn self.kneighbors_graph(mode="distance") return self
[docs] def kneighbors( self, X: np.ndarray = None, maxl: int = None, mode: str = "distance" ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: if self._nn is None: raise ValueError("must fit() before generating kneighbors graphs") """Finds the K-neighbors of a point. Returns indices of and distances to the neighbors of each point. Parameters ---------- X : array-like, shape (n_query, n_features), The query point or points. If not provided, neighbors of each indexed point are returned. In this case, the query point is not considered its own neighbor. maxl: int max degree of connectivity allowed mode : "distance" or "connectivity" Decides the kind of output Returns ------- dist_new : np.ndarray (samples, k+1) distances to the NN dsi_new : np.ndarray (samples, k+1) indexes of the NN, first column is the sample itself l: np.ndarray (samples) l[i] is the number of connections from other samples to the sample i NOTE: First column (0) correspond to the sample itself, the nearest neighbour is at the second column (1) """ if X is not None: self.data = X if maxl is not None: self.maxl = maxl if mode == "distance": if self.metric == "js": self.dsi, self.dist = self._nn.query(self.data, k=self.sight_k + 1) else: self.dist, self.dsi = self._nn.kneighbors( self.data, return_distance=True ) else: if self.metric == "js": self.dsi, _ = self._nn.query(self.data, k=self.sight_k + 1) else: self.dsi = self._nn.kneighbors(self.data, return_distance=False) self.dist = np.ones_like(self.dsi, dtype="float64") self.dist[:, 0] = 0 logging.debug( f"Using the initialization network to find a {self.k}-NN " f"graph with maximum connectivity of {self.maxl}" ) self.dist, self.dsi, self.l = knn_balance( self.dsi, self.dist, maxl=self.maxl, k=self.k ) return self.dist, self.dsi, self.l
[docs] def kneighbors_graph( self, X: np.ndarray = None, maxl: int = None, mode: str = "distance" ) -> sparse.csr_matrix: """Retrun the K-neighbors graph as a sparse csr matrix Parameters ---------- X : array-like, shape (n_query, n_features), The query point or points. If not provided, neighbors of each indexed point are returned. In this case, the query point is not considered its own neighbor. maxl: int max degree of connectivity allowed mode : "distance" or "connectivity" Decides the kind of output Returns ------- neighbor_graph : scipy.sparse.csr_matrix The values are either distances or connectivity dependig of the mode parameter NOTE: The diagonal will be zero even though the value 0 is actually stored """ dist_new, dsi_new, _ = self.kneighbors(X=X, maxl=maxl, mode=mode) logging.debug("Returning sparse matrix") self.bknn = sparse.csr_matrix( ( np.ravel(dist_new), np.ravel(dsi_new), np.arange( 0, dist_new.shape[0] * dist_new.shape[1] + 1, dist_new.shape[1] ), ), (self.n_samples, self.n_samples), ) self.bknn.eliminate_zeros() return self.bknn
[docs] def mnn_graph(self): """get mutual nearest neighbor graph from bknn""" if self.mknn is None: if self.bknn is None: raise ValueError("must fit() before generating kneighbors graphs") # element-wise minimum between bknn and bknn.T, so non-mutual value will be 0 self.mknn = self.bknn.minimum(self.bknn.transpose()) return self.mknn
[docs] def rnn_graph(self): """get rnn from mknn, return a sparse binary matrix""" # Convert distances to similarities if self.mknn is None: self.mnn_graph() mknn_sim = self.mknn.copy() bknn_sim = self.bknn.copy() max_d = self.bknn.data.max() bknn_sim.data = (max_d - bknn_sim.data) / max_d mknn_sim.data = (max_d - mknn_sim.data) / max_d mknn_sim = mknn_sim.tocoo() mknn_sim.setdiag(0) # Compute the effective resolution d = 1 - bknn_sim.data radius = np.percentile(d, 90) logging.info(f" 90th percentile radius: {radius:.02}") inside = mknn_sim.data > 1 - radius self.rnn = sparse.coo_matrix( (mknn_sim.data[inside], (mknn_sim.row[inside], mknn_sim.col[inside])), shape=mknn_sim.shape, ) return self.rnn