Source code for ALLCools.clustering.incremental_pca
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import IncrementalPCA as _IncrementalPCA
from ..count_matrix.zarr import dataset_to_array
[docs]def _normalize_per_cell(matrix, cell_sum):
"""Normalize matrix row sum to to cell_sum"""
print("normalize per cell to CPM")
if cell_sum is None:
norm_vec = (matrix.sum(axis=1) + 1) / 1000000
else:
norm_vec = cell_sum / 1000000
norm_vec = norm_vec.values
norm_vec = norm_vec.astype(np.float32)
matrix /= norm_vec[:, None]
return matrix
[docs]class IncrementalPCA:
def __init__(
self,
n_components=100,
sparse=False,
normalize_per_cell=True,
log1p=True,
scale=True,
**kwargs,
):
"""
Perform PCA for huge dataset that exceeds physical memory. Start from the raw count matrix stored in
Parameters
----------
n_components
number of PCs to calculate
sparse
Whether treat the matrix as sparse matrix. If true, will
1) load data as sparse matrix;
2) do not perform mean center when scaling, only scale the std
normalize_per_cell
Normalize the matrix per cell
log1p
whether perform log1p transform
scale
whether scale the features
kwargs
parameters of sklearn.decomposition.IncrementalPCA
"""
self.pca = _IncrementalPCA(n_components=n_components, **kwargs)
self.sparse = sparse
self.normalize_per_cell = normalize_per_cell
self.log1p = log1p
self.scale = scale
self.scaler = None
self.cell_sum = None
self.use_features = None
self.obs_dim = None
self.var_dim = None
self.load_chunk = None
self._fit = False
return
[docs] def fit(
self,
ds,
use_cells=None,
use_features=None,
chunk=500000,
cell_sum=None,
var_dim="gene",
obs_dim="cell",
load_chunk=None,
random_shuffle=True,
):
self.cell_sum = cell_sum
self.use_features = use_features
self.obs_dim = obs_dim
self.var_dim = var_dim
self.load_chunk = chunk if load_chunk is None else load_chunk
# prepare index
cell_index = ds.get_index(obs_dim)
if use_cells is not None:
cell_index = cell_index[cell_index.isin(use_cells)].copy()
# random shuffle to make fitting more stable
if random_shuffle:
cell_order = cell_index.tolist()
np.random.shuffle(cell_order)
cell_order = pd.Index(cell_order)
else:
cell_order = cell_index
# fit by chunks
chunk_stds = []
chunk_means = []
for chunk_start in range(0, cell_order.size, chunk):
print(f"Fitting {chunk_start}-{chunk_start + chunk}")
_chunk_cells = cell_order[chunk_start : chunk_start + chunk]
_chunk_matrix, _chunk_cells, _chunk_genes = dataset_to_array(
ds,
use_cells=_chunk_cells,
use_genes=use_features,
sparse=self.sparse,
obs_dim=obs_dim,
var_dim=var_dim,
chunk=self.load_chunk,
)
if cell_sum is not None:
_chunk_cell_sum = cell_sum.loc[_chunk_cells]
else:
_chunk_cell_sum = None
_chunk_matrix = _chunk_matrix.astype(np.float32)
# normalize cell counts
if self.normalize_per_cell:
_chunk_matrix = _normalize_per_cell(
matrix=_chunk_matrix, cell_sum=_chunk_cell_sum
)
# log transfer
if self.log1p:
print("log1p transform")
_chunk_matrix = np.log1p(_chunk_matrix)
# scale
if self.scale:
print("Scale")
if self.scaler is None:
# assume the chunk is large enough, so only use the first chunk to fit
# e.g., 5,000,000 cells
self.scaler = StandardScaler(with_mean=not self.sparse)
_chunk_matrix = self.scaler.fit_transform(_chunk_matrix)
else:
# transform remaining cells
_chunk_matrix = self.scaler.transform(_chunk_matrix)
# save chunk stats for checking robustness
chunk_stds.append(_chunk_matrix.std(axis=0))
chunk_means.append(_chunk_matrix.mean(axis=0))
# fit IncrementalPCA
print("Fit PCA")
self.pca.partial_fit(_chunk_matrix)
self._fit = True
return
[docs] def transform(self, ds, use_cells=None, chunk=100000):
if not self._fit:
raise ValueError("fit first before transform")
cell_index = ds.get_index(self.obs_dim)
if use_cells is not None:
cell_index = cell_index[cell_index.isin(use_cells)].copy()
total_pcs = []
for chunk_start in range(0, cell_index.size, chunk):
print(f"Transforming {chunk_start}-{chunk_start + chunk}")
_chunk_cells = cell_index[chunk_start : chunk_start + chunk]
_chunk_matrix, _chunk_cells, _chunk_genes = dataset_to_array(
ds,
use_cells=_chunk_cells,
use_genes=self.use_features,
sparse=self.sparse,
obs_dim=self.obs_dim,
var_dim=self.var_dim,
chunk=self.load_chunk,
)
if self.cell_sum is not None:
_chunk_cell_sum = self.cell_sum.loc[_chunk_cells]
else:
_chunk_cell_sum = None
_chunk_matrix = _chunk_matrix.astype(np.float32)
# normalize cell counts
if self.normalize_per_cell:
_chunk_matrix = _normalize_per_cell(
matrix=_chunk_matrix, cell_sum=_chunk_cell_sum
)
# log transfer
if self.log1p:
print("log1p transform")
_chunk_matrix = np.log1p(_chunk_matrix)
# scale
if self.scale:
print("Scale")
if self.scaler is None:
# this shouldn't happen in transform
raise ValueError("scale is True, but scaler not exist")
else:
# transform remaining cells
_chunk_matrix = self.scaler.transform(_chunk_matrix)
# transform
print("Transform PCA")
pcs = self.pca.transform(_chunk_matrix)
pcs = pd.DataFrame(pcs, index=_chunk_cells)
total_pcs.append(pcs)
total_pcs = pd.concat(total_pcs)
return total_pcs
[docs] def fit_transform(
self,
ds,
use_cells=None,
use_features=None,
chunk=500000,
cell_sum=None,
var_dim="gene",
obs_dim="cell",
load_chunk=None,
random_shuffle=True,
):
self.fit(
ds,
use_cells=use_cells,
use_features=use_features,
chunk=chunk,
cell_sum=cell_sum,
var_dim=var_dim,
obs_dim=obs_dim,
load_chunk=load_chunk,
random_shuffle=random_shuffle,
)
total_pcs = self.transform(ds, use_cells=use_cells, chunk=self.load_chunk)
return total_pcs