import glob
import pathlib
import re
import warnings
from typing import Union
import anndata
import dask
import numpy as np
import pandas as pd
import xarray as xr
import yaml
from pybedtools import BedTool
from .utilities import (
highly_variable_methylation_feature,
calculate_posterior_mc_frac,
determine_engine,
)
from ..plot.qc_plots import cutoff_vs_cell_remain, plot_dispersion
[docs]def make_obs_df_var_df(use_data, obs_dim, var_dim):
obs_df = pd.DataFrame([], index=use_data.get_index(obs_dim).astype(str))
var_df = pd.DataFrame([], index=use_data.get_index(var_dim).astype(str))
coord_prefix = re.compile(f"({obs_dim}|{var_dim})_")
for k, v in use_data.coords.items():
if k in [obs_dim, var_dim]:
continue
try:
# v.dims should be size 1
if v.dims[0] == obs_dim:
series = v.to_pandas()
# adata.obs_name is str type
series.index = series.index.astype(str)
obs_df[coord_prefix.sub("", k)] = series
elif v.dims[0] == var_dim:
series = v.to_pandas()
# adata.var_name is str type
series.index = series.index.astype(str)
var_df[coord_prefix.sub("", k)] = series
else:
pass
except IndexError:
# v.dims is 0, just ignore
pass
return obs_df, var_df
[docs]class MCDS(xr.Dataset):
"""
MCDS Class
"""
def __init__(self, dataset, obs_dim=None, var_dim=None):
"""
Init MCDS
Parameters
----------
dataset
"""
super().__init__(
data_vars=dataset.data_vars, coords=dataset.coords, attrs=dataset.attrs
)
self.obs_dim = obs_dim
self.var_dim = var_dim
if self.obs_dim is not None:
# validate obs_dim is unique
if self.obs_names.duplicated().sum() != 0:
print('Warning: obs_names are not unique.')
return
@property
[docs] def var_dim(self):
if self.attrs['var_dim'] == 'null':
return None
else:
return self.attrs['var_dim']
@var_dim.setter
def var_dim(self, var_dim):
if var_dim is not None:
if var_dim not in self.dims:
raise KeyError(
f"{var_dim} does not occur in dimension names: {list(self.dims.keys())}"
)
else:
var_dim = 'null'
self.attrs['var_dim'] = var_dim
return
@property
[docs] def obs_dim(self):
if self.attrs['obs_dim'] == 'null':
return None
else:
return self.attrs['obs_dim']
@obs_dim.setter
def obs_dim(self, obs_dim):
if obs_dim is not None:
if obs_dim not in self.dims:
raise KeyError(
f"{obs_dim} does not occur in dimension names: {list(self.dims.keys())}"
)
else:
obs_dim = 'null'
self.attrs['obs_dim'] = obs_dim
return
@property
[docs] def obs_names(self):
if self.obs_dim is None:
return None
else:
obs_index = self.get_index(self.obs_dim)
return obs_index
@property
[docs] def var_names(self):
if self.var_dim is None:
return None
else:
var_index = self.get_index(self.var_dim)
return var_index
[docs] def _verify_dim(self, dim, mode):
if dim is None:
if mode == 'var':
if self.var_dim is None:
raise ValueError('MCDS does not have var_dim set, please provide the name of var_dim')
else:
return self.var_dim
else:
if self.obs_dim is None:
raise ValueError('MCDS does not have obs_dim set, please provide the name of obs_dim')
else:
return self.obs_dim
else:
if dim not in self.dims:
raise KeyError(
f"{dim} does not occur in dimension names: {list(self.dims.keys())}"
)
return dim
@classmethod
[docs] def open(cls,
mcds_paths,
obs_dim="cell",
use_obs=None,
var_dim=None,
chunks='auto',
split_large_chunks=True,
obj_to_str=True,
engine=None):
"""
Take one or multiple MCDS file paths and create single MCDS concatenated on obs_dim
Parameters
----------
mcds_paths
Single MCDS path or MCDS path pattern with wildcard or MCDS path list
obs_dim
Dimension name of observations, default is 'cell'
use_obs
Subset the MCDS by a list of observation IDs.
var_dim
Which var_dim dataset to use, needed when MCDS has multiple var_dim stored in the same directory
chunks
if not None, xarray will use chunks to load data as dask.array. The "auto" means xarray will
determine chunks automatically. For more options, read the `xarray.open_dataset` `chunks` parameter
documentation. If None, xarray will not use dask, which is not desired in most cases.
split_large_chunks
Whether split large chunks in dask config array.slicing.split_large_chunks
obj_to_str
Whether turn object coordinates into string data type
engine
xarray engine used to store MCDS, if multiple MCDS provided, the engine need to be the same
Returns
-------
MCDS
"""
# parse wildcard, if any
if isinstance(mcds_paths, (str, pathlib.Path)):
mcds_paths = [mcds_paths]
_flat_mcds_paths = []
for path in mcds_paths:
if isinstance(path, str):
if '*' in path:
_flat_mcds_paths += list(glob.glob(path))
else:
_flat_mcds_paths.append(path)
else:
_flat_mcds_paths.append(str(path))
# if var_dim is a list, open each separately and return merged MCDS:
if isinstance(var_dim, list):
ds_list = []
for _var_dim in var_dim:
ds = cls.open(_flat_mcds_paths,
obs_dim=obs_dim,
use_obs=use_obs,
var_dim=_var_dim,
chunks=chunks,
split_large_chunks=split_large_chunks,
engine=engine)
ds_list.append(ds)
ds = xr.merge(ds_list)
return cls(ds, obs_dim=obs_dim, var_dim=None).squeeze()
# determine dataset var_dim
_var_dims = set()
_final_paths = []
has_dataset = False
for path in _flat_mcds_paths:
config_path = pathlib.Path(f'{path}/.ALLCools')
if config_path.exists():
has_dataset = True
with open(config_path) as f:
config = yaml.safe_load(f)
if var_dim is None:
if config['region_dim'] is None:
# try to infer if there is only one region
if len(config['ds_region_dim']) == 1:
_ds_var_dim = list(config['ds_region_dim'].values())[0]
_var_dims.add(_ds_var_dim)
else:
raise ValueError(
f'MCDS paths containing multiple datasets '
f'{list(config["ds_region_dim"].keys())}, '
'please specify one or multiple names with the "var_dim" parameter.'
)
else:
_ds_var_dim = config['region_dim']
_var_dims.add(config['region_dim'])
_final_paths.append(f'{path}/{_ds_var_dim}')
else:
if var_dim not in config['ds_region_dim'].keys():
raise KeyError(f'{path} do not have {var_dim}')
_var_dims.add(var_dim)
_final_paths.append(f'{path}/{var_dim}')
else:
# single netcdf or zarr file
_final_paths.append(path)
if has_dataset:
if len(_var_dims) != 1:
raise ValueError(f'Some MCDS dataset has multiple var_dim, please specify var_dim parameter.')
else:
var_dim = _var_dims.pop()
for path in _final_paths:
if not pathlib.Path(path).exists():
raise FileNotFoundError(f'{path} does not exist.')
# determine engine
if engine is None:
engine = determine_engine(_final_paths)
if len(_final_paths) == 1:
ds = xr.open_dataset(
_final_paths[0],
engine=engine,
# chunks=chunks # do not apply chunks parameter here, apply in the end
)
else:
with dask.config.set(
**{"array.slicing.split_large_chunks": split_large_chunks}):
ds = xr.open_mfdataset(
_final_paths,
parallel=False,
combine="nested",
concat_dim=obs_dim,
# chunks=chunks, # do not apply chunks parameter here, apply in the end
engine=engine,
)
if use_obs is not None:
with dask.config.set(
**{"array.slicing.split_large_chunks": split_large_chunks}):
use_obs_bool = ds.get_index(obs_dim).isin(use_obs)
ds = ds.sel({obs_dim: use_obs_bool})
# change object dtype to string
if obj_to_str:
for k in ds.coords.keys():
if ds.coords[k].dtype == 'O':
ds.coords[k] = ds.coords[k].astype(str)
if chunks is not None:
if chunks == 'auto':
chunks = {k: min(4096, max(v // 5, 1)) for k, v in ds.dims.items()}
ds = ds.chunk(chunks=chunks)
return cls(ds, obs_dim=obs_dim, var_dim=var_dim).squeeze()
[docs] def add_mc_frac(
self,
var_dim=None,
da=None,
normalize_per_cell=True,
clip_norm_value=10,
da_suffix="frac",
):
"""
Add posterior mC rate data array for certain feature type (var_dim).
Parameters
----------
var_dim
Name of the feature type
da
if None, will use f'{var_dim}_da'
normalize_per_cell
if True, will normalize the mC rate data array per cell
clip_norm_value
reset larger values in the normalized mC rate data array to this
da_suffix
name suffix appended to the calculated mC rate data array
Returns
-------
"""
var_dim = self._verify_dim(dim=var_dim, mode='var')
if da is None:
da = f"{var_dim}_da"
if da not in self.data_vars:
raise KeyError(f"{da} is not in this dataset")
if var_dim not in self[da].dims:
raise KeyError(f"{var_dim} is not a dimension of {da}")
da_mc = self[da].sel(count_type="mc")
da_cov = self[da].sel(count_type="cov")
frac = calculate_posterior_mc_frac(
mc_da=da_mc,
cov_da=da_cov,
var_dim=var_dim,
normalize_per_cell=normalize_per_cell,
clip_norm_value=clip_norm_value,
)
self[da + "_" + da_suffix] = frac
[docs] def add_mc_rate(self, *args, **kwargs):
warnings.warn(
'MCDS.add_mc_rate is renamed to MCDS.add_mc_frac, the default suffix also changed from "rate" to "frac"',
DeprecationWarning,
)
self.add_mc_frac(*args, **kwargs)
[docs] def add_feature_cov_mean(self, obs_dim=None, var_dim=None, plot=True):
"""
Add feature cov mean across obs_dim.
Parameters
----------
var_dim
Name of var dimension
obs_dim
Name of obs dimension
plot
If true, plot the distribution of feature cov mean
Returns
-------
None
"""
obs_dim = self._verify_dim(obs_dim, mode='obs')
var_dim = self._verify_dim(var_dim, mode='var')
cov_mean_key = f"{var_dim}_cov_mean"
if cov_mean_key in self.coords:
print(f'Using existing {cov_mean_key} in MCDS.coords, if this is not desired, delete the '
f'existing results by `del mcds.coords["{cov_mean_key}"]` and recalculate.')
feature_cov_mean = self.coords[cov_mean_key].to_pandas()
else:
_da = self[f"{var_dim}_da"]
if 'mc_type' in _da.dims:
feature_cov_mean = _da.sel(count_type="cov").sum(dim="mc_type").mean(dim=obs_dim).squeeze().to_pandas()
else:
feature_cov_mean = _da.sel(count_type="cov").mean(dim=obs_dim).squeeze().to_pandas()
self.coords[cov_mean_key] = feature_cov_mean
print(
f"Feature {var_dim} mean cov across cells added in MCDS.coords['{cov_mean_key}']."
)
if plot:
cutoff_vs_cell_remain(feature_cov_mean, name=f"{var_dim}_cov_mean")
return
[docs] def filter_feature_by_cov_mean(self, var_dim=None, min_cov=0, max_cov=999999):
"""
filter MCDS by feature cov mean. add_feature_cov_mean() must be called before this function.
Parameters
----------
var_dim
Name of var dimension
min_cov
Minimum cov cutoff
max_cov
Maximum cov cutoff
Returns
-------
MCDS
"""
var_dim = self._verify_dim(var_dim, mode='var')
try:
feature_cov_mean = self.coords[f"{var_dim}_cov_mean"].to_pandas()
except KeyError:
raise KeyError(
f"{var_dim}_cov_mean not found in the coords, run add_feature_cov_mean() first."
)
judge: pd.Series = (feature_cov_mean > min_cov) & (feature_cov_mean < max_cov)
with dask.config.set(**{"array.slicing.split_large_chunks": False}):
# ignore dask warning
mcds = self.sel({var_dim: judge[judge].index})
selected_feature = judge.sum()
ratio = selected_feature / judge.size * 100
print(f"Before cov mean filter: {judge.size} {var_dim}")
print(f" After cov mean filter: {selected_feature} {var_dim} {ratio:.1f}%")
return mcds
[docs] def get_feature_bed(self, var_dim=None):
"""
Get a bed format data frame of the var_dim
Parameters
----------
var_dim
Name of var_dim
Returns
-------
pd.DataFrame
"""
var_dim = self._verify_dim(var_dim, mode='var')
if f"{var_dim}_bin_start" in self.coords:
start_name = f"{var_dim}_bin_start"
elif f"{var_dim}_start" in self.coords:
start_name = f"{var_dim}_start"
else:
raise KeyError(f'Neither {var_dim}_bin_start nor {var_dim}_start exists in MCDS.coords')
if f"{var_dim}_bin_end" in self.coords:
end_name = f"{var_dim}_bin_end"
elif f"{var_dim}_end" in self.coords:
end_name = f"{var_dim}_end"
else:
raise KeyError(f'Neither {var_dim}_bin_end nor {var_dim}_end exists in MCDS.coords')
bed_df = pd.DataFrame(
[
self.coords[f"{var_dim}_chrom"].to_pandas(),
self.coords[start_name].to_pandas(),
self.coords[end_name].to_pandas(),
],
index=["chrom", "start", "end"],
columns=self.get_index(var_dim),
).T
return bed_df
[docs] def remove_black_list_region(self, black_list_path, var_dim=None, f=0.2):
"""
Remove regions overlap (bedtools intersect -f {f}) with regions in the black_list_path
Parameters
----------
var_dim
Name of var_dim
black_list_path
Path to the black list bed file
f
Fraction of overlap when calling bedtools intersect
Returns
-------
MCDS
"""
var_dim = self._verify_dim(var_dim, mode='var')
with warnings.catch_warnings():
warnings.simplefilter("ignore")
feature_bed_df = self.get_feature_bed(var_dim=var_dim)
feature_bed = BedTool.from_dataframe(feature_bed_df)
black_list_bed = BedTool(black_list_path)
black_feature = feature_bed.intersect(black_list_bed, f=f, wa=True)
try:
black_feature_index = (
black_feature.to_dataframe()
.set_index(["chrom", "start", "end"])
.index
)
except pd.errors.EmptyDataError:
print("No feature overlapping the black list bed file.")
return self
black_feature_id = pd.Index(
feature_bed_df.reset_index()
.set_index(["chrom", "start", "end"])
.loc[black_feature_index][var_dim]
)
print(
f"{black_feature_id.size} {var_dim} features removed due to overlapping"
f" (bedtools intersect -f {f}) with black list regions."
)
with dask.config.set(**{"array.slicing.split_large_chunks": False}):
mcds = self.sel({var_dim: ~self.get_index(var_dim).isin(black_feature_id)})
return mcds
[docs] def remove_chromosome(self, exclude_chromosome, var_dim=None):
"""
Remove regions in specific chromosome
Parameters
----------
var_dim
Name of var_dim
exclude_chromosome
Chromosome to remove
Returns
-------
MCDS (xr.Dataset)
"""
var_dim = self._verify_dim(var_dim, mode='var')
judge = self.coords[f"{var_dim}_chrom"].isin(exclude_chromosome)
print(f"{int(judge.sum())} {var_dim} features in {exclude_chromosome} removed.")
with dask.config.set(**{"array.slicing.split_large_chunks": False}):
mcds = self.sel({var_dim: ~judge})
return mcds
[docs] def calculate_hvf_svr(
self,
mc_type=None,
var_dim=None,
obs_dim=None,
n_top_feature=5000,
da_suffix="frac",
plot=True,
):
from sklearn.svm import SVR
import plotly.graph_objects as go
obs_dim = self._verify_dim(obs_dim, mode='obs')
var_dim = self._verify_dim(var_dim, mode='var')
frac_da = self[f"{var_dim}_da_{da_suffix}"]
count_da = self[f"{var_dim}_da"]
if 'mc_type' in self.dims:
if mc_type is None:
mc_types = self.get_index('mc_type')
if mc_types.size > 1:
raise ValueError(f'Data array has multiple mc_types, '
f'please specify mc_type parameter.')
else:
mc_type = mc_types[0]
feature_mc_frac_mean = frac_da.sel(mc_type=mc_type).mean(dim=obs_dim).to_pandas()
feature_std = frac_da.sel(mc_type=mc_type).std(dim=obs_dim).to_pandas()
feature_cov_mean = count_da.sel(mc_type=mc_type, count_type="cov").mean(dim=obs_dim).to_pandas()
else:
feature_mc_frac_mean = frac_da.mean(dim=obs_dim).to_pandas()
feature_std = frac_da.std(dim=obs_dim).to_pandas()
feature_cov_mean = count_da.sel(count_type="cov").mean(dim=obs_dim).to_pandas()
# remove bad features
judge = (feature_mc_frac_mean > 0) & (feature_std > 0) & (feature_cov_mean > 0)
if n_top_feature >= judge.size:
n_top_feature = judge.size
print(
"n_top_feature is than total number of features, will use all features"
)
feature_mc_frac_mean = feature_mc_frac_mean[judge]
feature_var = (
feature_std[judge] ** 2
) # to be consistent with previous bin-based method, use var here
feature_cov_mean = feature_cov_mean[judge]
# prepare data
dispersion = feature_var / feature_mc_frac_mean
log2_disp = np.log2(dispersion)
log2_mc_frac_mean = np.log2(feature_mc_frac_mean)
log2_cov_mean = np.log2(feature_cov_mean)
x = np.vstack((log2_mc_frac_mean, log2_cov_mean)).T
# non-linear regression predicting dispersion using mc_frac_mean and cov_mean.
svr_gamma = 1000 / judge.sum()
print(
f"Fitting SVR with gamma {svr_gamma:.4f}, "
f"predicting feature dispersion using mc_frac_mean and cov_mean."
)
clf = SVR(gamma=svr_gamma)
clf.fit(x, log2_disp)
# Score is the relative position with respect of the fitted curve
score = log2_disp - clf.predict(x)
selected_feature_index = score.sort_values()[-n_top_feature:].index
# make results table
selected_feature_index = score.sort_values()[-n_top_feature:].index
hvf_df = pd.DataFrame(
{
"mean": feature_mc_frac_mean.reindex(judge.index).fillna(0),
"dispersion": dispersion.reindex(judge.index).fillna(0),
"cov": feature_cov_mean.reindex(judge.index).fillna(0),
"score": score.reindex(judge.index).fillna(-100),
}
)
hvf_df["feature_select"] = hvf_df.index.isin(selected_feature_index)
print(f"Total Feature Number: {judge.size}")
print(
f"Highly Variable Feature: {selected_feature_index.size} "
f"({(selected_feature_index.size / judge.size * 100):.1f}%)"
)
if plot:
if hvf_df.shape[0] > 5000:
plot_data = hvf_df.sample(5000)
else:
plot_data = hvf_df
fig = go.Figure(
data=[
go.Scatter3d(
x=plot_data["mean"],
y=plot_data["cov"],
z=np.log2(plot_data["dispersion"]),
mode="markers",
hoverinfo="none",
marker=dict(
size=2,
color=plot_data["feature_select"]
.map({True: "red", False: "gray"})
.tolist(), # set color to an array/list of desired values
opacity=0.8,
),
)
]
)
fig.update_layout(
scene=dict(
xaxis_title="mC Frac. Mean",
yaxis_title="Coverage Mean",
zaxis_title="log2(Dispersion)",
),
margin=dict(r=0, b=0, l=0, t=0),
)
fig.show()
for name, column in hvf_df.items():
if mc_type is None:
self.coords[f"{var_dim}_{name}"] = column
else:
self.coords[f"{var_dim}_{mc_type}_{name}"] = column
return hvf_df
[docs] def calculate_hvf(
self,
mc_type=None,
var_dim=None,
obs_dim=None,
min_disp=0.5,
max_disp=None,
min_mean=0,
max_mean=5,
n_top_feature=5000,
bin_min_features=5,
mean_binsize=0.05,
cov_binsize=100,
da_suffix="frac",
plot=True,
):
"""
Calculate normalized dispersion to select highly variable features.
Parameters
----------
mc_type
Type of mC to calculate
var_dim
Name of variable
obs_dim
Name of observation, default is cell
min_disp
minimum dispersion for a feature to be considered
max_disp
maximum dispersion for a feature to be considered
min_mean
minimum mean for a feature to be considered
max_mean
maximum mean for a feature to be considered
n_top_feature
Top N feature to use as highly variable feature.
If set, all the cutoff will be ignored, HDF selected based on order of normalized dispersion.
bin_min_features
Minimum number of features to be considered as a separate bin,
if bellow this number, the bin will be merged to its closest bin.
mean_binsize
bin size to separate features across mean
cov_binsize
bin size to separate features across coverage
plot
If true, will plot mean, coverage and normalized dispersion scatter plots.
Returns
-------
pd.DataFrame
"""
obs_dim = self._verify_dim(obs_dim, mode='obs')
var_dim = self._verify_dim(var_dim, mode='var')
frac_da = self[f"{var_dim}_da_{da_suffix}"]
if 'mc_type' in self.dims:
if mc_type is None:
mc_types = self.get_index('mc_type')
if mc_types.size > 1:
raise ValueError(f'Data array has multiple mc_types, '
f'please specify mc_type parameter.')
else:
mc_type = mc_types[0]
matrix = frac_da.sel(mc_type=mc_type).squeeze()
else:
matrix = frac_da.squeeze()
feature_mean_cov = self.coords[f"{var_dim}_cov_mean"]
hvf_df = highly_variable_methylation_feature(
cell_by_feature_matrix=matrix,
feature_mean_cov=feature_mean_cov,
obs_dim=obs_dim,
var_dim=var_dim,
min_disp=min_disp,
max_disp=max_disp,
min_mean=min_mean,
max_mean=max_mean,
n_top_feature=n_top_feature,
bin_min_features=bin_min_features,
mean_binsize=mean_binsize,
cov_binsize=cov_binsize,
)
selection = hvf_df["feature_select"]
print(f"Total Feature Number: {selection.size}")
print(
f"Highly Variable Feature: {selection.sum()} ({(selection.sum() / selection.size * 100):.1f}%)"
)
if plot:
plot_dispersion(
hvf_df,
hue="feature_select",
zlab="dispersion_norm",
data_quantile=(0.01, 0.99),
save_animate_path=None,
fig_kws=None,
)
for name, column in hvf_df.items():
if mc_type is None:
self.coords[f"{var_dim}_{name}"] = column
else:
self.coords[f"{var_dim}_{mc_type}_{name}"] = column
return hvf_df
[docs] def get_score_adata(self, mc_type, quant_type, obs_dim=None, var_dim=None, sparse=True):
QUANT_TYPES = ['hypo-score', 'hyper-score']
if quant_type.lower() == 'hypo':
quant_type = 'hypo-score'
elif quant_type.lower() == 'hyper':
quant_type = 'hyper_score'
else:
pass
if quant_type not in QUANT_TYPES:
raise ValueError(f'quant_type need to be in {QUANT_TYPES}, got {quant_type}.')
obs_dim = self._verify_dim(obs_dim, mode='obs')
var_dim = self._verify_dim(var_dim, mode='var')
da = self[f'{var_dim}_da_{mc_type}-{quant_type}']
# TODO load by chunk and parallel to make IO faster
if sparse:
from scipy.sparse import csr_matrix
data = csr_matrix(da.transpose(obs_dim, var_dim).values)
else:
data = da.transpose(obs_dim, var_dim).values
# TODO validate chrom coords
obs_df, var_df = make_obs_df_var_df(da, obs_dim, var_dim)
adata = anndata.AnnData(X=data, obs=obs_df, var=var_df)
return adata
[docs] def get_adata(
self,
mc_type=None,
obs_dim=None,
var_dim=None,
da_suffix="frac",
select_hvf=True,
split_large_chunks=True,
):
"""
Get anndata from MCDS mC rate matrix
Parameters
----------
mc_type
mC rate type
var_dim
Name of variable
da_suffix
Suffix of mC rate matrix
obs_dim
Name of observation
select_hvf
Select HVF or not, if True, will use mcds.coords['{var_dim}_{mc_type}_feature_select'] to select HVFs
split_large_chunks
Whether split large chunks in dask config array.slicing.split_large_chunks
Returns
-------
anndata.Anndata
"""
obs_dim = self._verify_dim(obs_dim, mode='obs')
var_dim = self._verify_dim(var_dim, mode='var')
if 'mc_type' in self.dims:
if mc_type is None:
mc_types = self.get_index('mc_type')
if mc_types.size > 1:
raise ValueError(f'Data array has multiple mc_types, '
f'please specify mc_type parameter.')
else:
mc_type = mc_types[0]
else:
mc_type = None
with dask.config.set(
**{"array.slicing.split_large_chunks": split_large_chunks}
):
frac_da = self[f"{var_dim}_da_{da_suffix}"]
if mc_type is None:
if select_hvf:
try:
use_features = (
self.coords[f"{var_dim}_feature_select"]
.to_pandas()
.dropna()
.astype(bool)
)
use_features = use_features[use_features].index
use_data = frac_da.sel({var_dim: use_features}).squeeze()
except KeyError:
print(
"select_hvf is True, but no highly variable feature results found, "
"use all features instead."
)
use_data = frac_da.squeeze()
else:
use_data = frac_da.squeeze()
else:
if select_hvf:
try:
use_features = (
self.coords[f"{var_dim}_{mc_type}_feature_select"]
.to_pandas()
.dropna()
.astype(bool)
)
use_features = use_features[use_features].index
use_data = frac_da.sel({"mc_type": mc_type, var_dim: use_features}).squeeze()
except KeyError:
print(
"select_hvf is True, but no highly variable feature results found, "
"use all features instead."
)
use_data = frac_da.sel({"mc_type": mc_type}).squeeze()
else:
use_data = frac_da.sel({"mc_type": mc_type}).squeeze()
obs_df, var_df = make_obs_df_var_df(use_data, obs_dim, var_dim)
adata = anndata.AnnData(
X=use_data.transpose(obs_dim, var_dim).values, obs=obs_df, var=var_df
)
return adata
[docs] def merge_cluster(
self,
cluster_col,
obs_dim=None,
add_mc_frac=True,
add_overall_mc=True,
overall_mc_da="chrom100k_da",
):
obs_dim = self._verify_dim(obs_dim, mode='obs')
if isinstance(cluster_col, str):
cluster_col = cluster_col
else:
self.coords[cluster_col.name] = cluster_col
cluster_col = cluster_col.name
with dask.config.set(**{"array.slicing.split_large_chunks": True}):
cluster_mcds = self.groupby(cluster_col).sum(dim=obs_dim).load()
cluster_mcds = MCDS(cluster_mcds)
if add_mc_frac:
for name, da in cluster_mcds.data_vars.items():
if name.endswith("_da") and ("count_type" in da.dims):
cluster_mcds.add_mc_frac(var_dim=name[:-3])
if add_overall_mc:
if 'mc_type' in cluster_mcds.dims:
for mc_type in cluster_mcds.get_index("mc_type"):
overall_mc_dim = overall_mc_da[:-3]
mc = (
cluster_mcds[overall_mc_da]
.sel(mc_type=mc_type, count_type="mc")
.sum(dim=overall_mc_dim)
)
cov = (
cluster_mcds[overall_mc_da]
.sel(mc_type=mc_type, count_type="cov")
.sum(dim=overall_mc_dim)
)
cluster_mcds.coords[f"{cluster_col}_{mc_type}_overall"] = mc / cov
else:
overall_mc_dim = overall_mc_da[:-3]
mc = cluster_mcds[overall_mc_da].sel(count_type="mc").sum(dim=overall_mc_dim)
cov = cluster_mcds[overall_mc_da].sel(count_type="cov").sum(dim=overall_mc_dim)
cluster_mcds.coords[f"{cluster_col}_overall"] = mc / cov
return cluster_mcds
[docs] def to_region_ds(self, region_dim=None):
region_dim = self._verify_dim(region_dim, mode='var')
from .region_ds import RegionDS
_region_ds = RegionDS(self, region_dim=region_dim)
return _region_ds
[docs] def write_dataset(self,
output_path,
mode='w-',
obs_dim=None,
var_dims: Union[str, list] = None,
use_obs=None,
chunk_size=1000):
"""
Write MCDS into a on-disk zarr dataset. Data arrays for each var_dim will be saved in separate
sub-directories of output_path.
Parameters
----------
output_path
Path of the zarr dataset
mode
'w-' means write to output_path, fail if the path exists; 'w' means write to output_path,
overwrite if the var_dim sub-directory exists
obs_dim
dimension name of observations
var_dims
dimension name, or a list of dimension names of variables
use_obs
Select AND order observations when write.
chunk_size
The load and write chunks, set this as large as possible based on available memory.
Returns
-------
output_path
"""
# save mcds to a location, if the location is a MCDS dataset, auto save to sub_dir and modify .ALLCools
# deal with large save issue, load by chunk and write append
obs_dim = self._verify_dim(obs_dim, mode='obs')
if var_dims is None:
var_dims = [self._verify_dim(None, mode='var')]
else:
if isinstance(var_dims, str):
var_dims = [var_dims]
else:
pass
# determine output_path type
output_path = pathlib.Path(output_path)
config_path = pathlib.Path(f'{output_path}/.ALLCools')
if config_path.exists():
if mode == 'w-':
raise FileExistsError(f'{output_path} already exist. Use mode="a" '
f'if you want to add additional data array to exist dataset. '
f'Or use mode="w" to overwrite existing dataset')
else:
import shutil
shutil.rmtree(output_path)
# separate data_vars by var_dim
data_vars_dict = {k: [] for k in var_dims}
for da_name, da in self.data_vars.items():
var_dim_count = 0
for var_dim in var_dims:
if var_dim in da.dims:
var_dim_count += 1
if var_dim_count == 1:
data_vars_dict[var_dim].append(da_name)
else:
raise ValueError(f'{da_name} contain more then one var_dim. '
f'Dims of {da_name} is {da.dims}')
obs_index = self.get_index(obs_dim)
for var_dim, da_names in data_vars_dict.items():
_this_output_path = f'{output_path}/{var_dim}'
if pathlib.Path(_this_output_path).exists():
print(f'{output_path}/{var_dim} will be overwrote by new data array')
if len(da_names) == 0:
print(f'No data for var_dim {var_dim}.')
else:
_mcds = self[da_names]
# pop out obs_dim and var_dim from attrs, do not save these to dataset
_mcds.attrs.pop('obs_dim')
_mcds.attrs.pop('var_dim')
for i, chunk_start in enumerate(range(0, obs_index.size, chunk_size)):
if use_obs is None:
_obs_chunk = obs_index[chunk_start:chunk_start + chunk_size]
_chunk_ds = _mcds.sel({obs_dim: _obs_chunk}).load()
else:
_obs_chunk = use_obs[chunk_start:chunk_start + chunk_size]
# load first, order next, to prevent unordered index causing more chunks
_chunk_ds = _mcds.sel({obs_dim: obs_index.isin(_obs_chunk)}).load()
_chunk_ds = _chunk_ds.sel({obs_dim: _obs_chunk})
if i == 0:
_chunk_ds.to_zarr(_this_output_path, mode='w')
else:
_chunk_ds.to_zarr(_this_output_path, append_dim=obs_dim)
# write config
from .utilities import update_dataset_config
update_dataset_config(output_path,
add_ds_region_dim={var_dim: var_dim
for var_dim, da_list in data_vars_dict.items()
if len(da_list) > 0},
add_ds_sample_dim={var_dim: obs_dim
for var_dim, da_list in data_vars_dict.items()
if len(da_list) > 0})
return output_path