import numpy as np
from sklearn.decomposition import TruncatedSVD
import pandas as pd
import warnings
from pybedtools import BedTool

[docs]def remove_black_list_region(adata, black_list_path, f=0.2): """ Remove regions overlap (bedtools intersect -f {f}) with regions in the black_list_path Parameters ---------- adata black_list_path Path to the black list bed file f Fraction of overlap when calling bedtools intersect Returns ------- None """ with warnings.catch_warnings(): warnings.simplefilter("ignore") feature_bed_df = adata.var[["chrom", "start", "end"]] feature_bed = BedTool.from_dataframe(feature_bed_df) black_list_bed = BedTool(black_list_path) black_feature = feature_bed.intersect(black_list_bed, f=f, wa=True) try: black_feature_index = ( black_feature.to_dataframe().set_index(["chrom", "start", "end"]).index ) black_feature_id = pd.Index( feature_bed_df.reset_index() .set_index(["chrom", "start", "end"]) .loc[black_feature_index][] ) print( f"{black_feature_id.size} features removed due to overlapping" f" (bedtools intersect -f {f}) with black list regions." ) adata._inplace_subset_var(~adata.var_names.isin(black_feature_id)) except pd.errors.EmptyDataError: # no overlap with black list pass return
[docs]def binarize_matrix(adata, cutoff=0.95): """ Binarize adata.X with adata.X > cutoff Parameters ---------- adata AnnData object whose X is survival function matrix cutoff Cutoff to binarize the survival function Returns ------- None """ adata.X = (adata.X > cutoff).astype(np.int8) return
[docs]def filter_regions(adata, hypo_percent=0.5): """ Filter regions based on % of cells having non-zero scores. Parameters ---------- adata hypo_percent min % of cells that are non-zero in this region. Returns ------- """ n_cell = int(adata.shape[0] * hypo_percent / 100) hypo_judge = (adata.X > 0).sum(axis=0).A1 > n_cell adata._inplace_subset_var(hypo_judge) print(f'{hypo_judge.sum()} regions remained, with # of non-zero cells > {n_cell}.') return
[docs]def tf_idf(data, scale_factor): col_sum = data.sum(axis=0).A1 row_sum = data.sum(axis=1).A1 idf = np.log(1 + data.shape[0] / col_sum) tf = data = / np.repeat(row_sum, row_sum) = np.log( * scale_factor + 1) tf = tf.multiply(idf) return tf
[docs]def lsi( adata, scale_factor=100000, n_components=100, algorithm="arpack", obsm="X_pca", random_state=0, fit_size=None, ): """ Run TF-IDF on the binarized adata.X, followed by TruncatedSVD and then scale the components by svd.singular_values_ Parameters ---------- adata scale_factor n_components algorithm obsm random_state fit_size Ratio or absolute int value, use to downsample when fitting the SVD to speed up run time. Returns ------- """ # tf-idf data = adata.X.astype(np.int8).copy() tf = tf_idf(data, scale_factor) n_rows, n_cols = tf.shape n_components = min(n_rows, n_cols, n_components) svd = TruncatedSVD( n_components=n_components, algorithm=algorithm, random_state=random_state ) if fit_size is None: # fit the SVD using all rows matrix_reduce = svd.fit_transform(tf) elif fit_size >= n_rows: # fit size is larger than actual data size matrix_reduce = svd.fit_transform(tf) else: # fit the SVD using partial rows to speed up if fit_size < 1: fit_size = max(int(n_rows * fit_size), n_components) use_cells = ( pd.Series(range(n_rows)) .sample(fit_size, random_state=random_state) .sort_index() .tolist() )[use_cells, :]) matrix_reduce = svd.transform(tf) matrix_reduce = matrix_reduce / svd.singular_values_ # PCA is the default name for many following steps in scanpy, use the name here for convenience. # However, this is not PCA adata.obsm[obsm] = matrix_reduce return svd