import subprocess
import glob
import logging
import pathlib
import numpy as np
import pandas as pd
import xarray as xr
import warnings
from typing import Union
import yaml
[docs]log = logging.getLogger()
[docs]def calculate_posterior_mc_frac(
mc_da, cov_da, var_dim=None, normalize_per_cell=True, clip_norm_value=10
):
# so we can do post_frac only in a very small set of gene to prevent memory issue
with warnings.catch_warnings():
warnings.simplefilter("ignore")
# here we expected to see a true_divide warning due to cov=0
raw_frac = mc_da / cov_da
if isinstance(raw_frac, np.ndarray):
# np.ndarray
ndarray = True
else:
ndarray = False
if ndarray:
cell_rate_mean = np.nanmean(raw_frac, axis=1)
cell_rate_var = np.nanvar(raw_frac, axis=1)
else:
# assume xr.DataArray
if var_dim is None:
cell_rate_mean = raw_frac.mean(axis=1) # this skip na
cell_rate_var = raw_frac.var(axis=1) # this skip na
else:
cell_rate_mean = raw_frac.mean(dim=var_dim) # this skip na
cell_rate_var = raw_frac.var(dim=var_dim) # this skip na
# based on beta distribution mean, var
# a / (a + b) = cell_rate_mean
# a * b / ((a + b) ^ 2 * (a + b + 1)) = cell_rate_var
# calculate alpha beta value for each cell
cell_a = (1 - cell_rate_mean) * (
cell_rate_mean ** 2
) / cell_rate_var - cell_rate_mean
cell_b = cell_a * (1 / cell_rate_mean - 1)
# cell specific posterior rate
post_frac: Union[np.ndarray, xr.DataArray]
if ndarray:
post_frac = (mc_da + cell_a[:, None]) / (
cov_da + cell_a[:, None] + cell_b[:, None]
)
else:
post_frac = (mc_da + cell_a) / (cov_da + cell_a + cell_b)
if normalize_per_cell:
# there are two ways of normalizing per cell, by posterior or prior mean:
# prior_mean = cell_a / (cell_a + cell_b)
# posterior_mean = post_rate.mean(dim=var_dim)
# Here I choose to use prior_mean to normalize cell,
# therefore all cov == 0 features will have normalized rate == 1 in all cells.
# i.e. 0 cov feature will provide no info
prior_mean = cell_a / (cell_a + cell_b)
if ndarray:
post_frac = post_frac / prior_mean[:, None]
else:
post_frac = post_frac / prior_mean
if clip_norm_value is not None:
if isinstance(post_frac, np.ndarray):
# np.ndarray
post_frac[post_frac > clip_norm_value] = clip_norm_value
else:
# xarray.DataArray
post_frac = post_frac.where(
post_frac < clip_norm_value, clip_norm_value
)
return post_frac
[docs]def calculate_posterior_mc_frac_lazy(
mc_da,
cov_da,
var_dim,
output_prefix,
cell_chunk=20000,
dask_cell_chunk=500,
normalize_per_cell=True,
clip_norm_value=10,
):
"""
Running calculate_posterior_mc_rate with dask array and directly save to disk.
This is highly memory efficient. Use this for dataset larger then machine memory.
Parameters
----------
mc_da
cov_da
var_dim
output_prefix
cell_chunk
dask_cell_chunk
normalize_per_cell
clip_norm_value
Returns
-------
"""
cell_list = mc_da.get_index("cell")
cell_chunks = [
cell_list[i: i + cell_chunk] for i in range(0, cell_list.size, cell_chunk)
]
output_paths = []
for chunk_id, cell_list_chunk in enumerate(cell_chunks):
_mc_da = mc_da.sel(cell=cell_list_chunk)
_cov_da = cov_da.sel(cell=cell_list_chunk)
post_rate = calculate_posterior_mc_frac(
mc_da=_mc_da,
cov_da=_cov_da,
var_dim=var_dim,
normalize_per_cell=normalize_per_cell,
clip_norm_value=clip_norm_value,
)
if len(cell_chunks) == 1:
chunk_id = ""
else:
chunk_id = f".{chunk_id}"
output_path = output_prefix + f".{var_dim}_da_frac{chunk_id}.mcds"
# to_netcdf trigger the dask computation, and save output directly into disk, quite memory efficient
post_rate.to_netcdf(output_path)
output_paths.append(output_path)
chunks = {"cell": dask_cell_chunk}
total_post_rate = xr.concat(
[xr.open_dataarray(path, chunks=chunks) for path in output_paths], dim="cell"
)
return total_post_rate
[docs]def calculate_gch_rate(mcds, var_dim="chrom100k"):
rate_da = mcds.sel(mc_type=["GCHN", "HCHN"]).add_mc_rate(
dim=var_dim, da=f"{var_dim}_da", normalize_per_cell=False, inplace=False
)
# (PCG - PCH) / (1 - PCH)
real_gc_rate = (rate_da.sel(mc_type="GCHN") - rate_da.sel(mc_type="HCHN")) / (
1 - rate_da.sel(mc_type="HCHN")
)
real_gc_rate = real_gc_rate.transpose("cell", var_dim).values
real_gc_rate[real_gc_rate < 0] = 0
# norm per cell
cell_overall_count = (
mcds[f"{var_dim}_da"].sel(mc_type=["GCHN", "HCHN"]).sum(dim=var_dim)
)
cell_overall_rate = cell_overall_count.sel(
count_type="mc"
) / cell_overall_count.sel(count_type="cov")
gchn = cell_overall_rate.sel(mc_type="GCHN")
hchn = cell_overall_rate.sel(mc_type="HCHN")
overall_gchn = (gchn - hchn) / (1 - hchn)
real_gc_rate = real_gc_rate / overall_gchn.values[:, None]
return real_gc_rate
[docs]def get_mean_dispersion(x, obs_dim):
# mean
mean = x.mean(dim=obs_dim).load()
# var
# enforce R convention (unbiased estimator) for variance
mean_sq = (x * x).mean(dim=obs_dim).load()
var = (mean_sq - mean ** 2) * (x.sizes[obs_dim] / (x.sizes[obs_dim] - 1))
# now actually compute the dispersion
mean.where(mean > 1e-12, 1e-12) # set entries equal to zero to small value
# raw dispersion is the variance normalized by mean
dispersion = var / mean
return mean, dispersion
[docs]def highly_variable_methylation_feature(
cell_by_feature_matrix,
feature_mean_cov,
obs_dim=None,
var_dim=None,
min_disp=0.5,
max_disp=None,
min_mean=0,
max_mean=5,
n_top_feature=None,
bin_min_features=5,
mean_binsize=0.05,
cov_binsize=100,
):
"""
Adapted from Scanpy, the main difference is that,
this function normalize dispersion based on both mean and cov bins.
"""
# RNA is only scaled by mean, but methylation is scaled by both mean and cov
log.info("extracting highly variable features")
if n_top_feature is not None:
log.info(
"If you pass `n_top_feature`, all cutoffs are ignored. "
"Features are ordered by normalized dispersion."
)
# warning for extremely low cov
low_cov_portion = (feature_mean_cov < 10).sum() / feature_mean_cov.size
if low_cov_portion > 0.2:
log.warning(
f"{int(low_cov_portion * 100)}% feature with < 10 mean cov, "
f"consider filter by cov before find highly variable feature. "
f"Otherwise some low coverage feature may be elevated after normalization."
)
if len(cell_by_feature_matrix.dims) != 2:
raise ValueError(
f"Input cell_by_feature_matrix is not 2-D matrix, "
f"got {len(cell_by_feature_matrix.dims)} dim(s)"
)
else:
if (obs_dim is None) or (var_dim is None):
obs_dim, var_dim = cell_by_feature_matrix.dims
# rename variable
x = cell_by_feature_matrix
cov = feature_mean_cov
mean, dispersion = get_mean_dispersion(x, obs_dim=obs_dim)
dispersion = np.log(dispersion)
# all of the following quantities are "per-feature" here
df = pd.DataFrame(index=cell_by_feature_matrix.get_index(var_dim))
df["mean"] = mean.to_pandas().copy()
df["dispersion"] = dispersion.to_pandas().copy()
df["cov"] = cov.to_pandas().copy()
# instead of n_bins, use bin_size, because cov and mc are in different scale
df["mean_bin"] = (df["mean"] / mean_binsize).astype(int)
df["cov_bin"] = (df["cov"] / cov_binsize).astype(int)
# save bin_count df, gather bins with more than bin_min_features features
bin_count = (
df.groupby(["mean_bin", "cov_bin"])
.apply(lambda i: i.shape[0])
.reset_index()
.sort_values(0, ascending=False)
)
bin_count.head()
bin_more_than = bin_count[bin_count[0] > bin_min_features]
if bin_more_than.shape[0] == 0:
raise ValueError(
f"No bin have more than {bin_min_features} features, use larger bin size."
)
# for those bin have too less features, merge them with closest bin in manhattan distance
# this usually don't cause much difference (a few hundred features), but the scatter plot will look more nature
index_map = {}
for _, (mean_id, cov_id, count) in bin_count.iterrows():
if count > 1:
index_map[(mean_id, cov_id)] = (mean_id, cov_id)
manhattan_dist = (bin_more_than["mean_bin"] - mean_id).abs() + (
bin_more_than["cov_bin"] - cov_id
).abs()
closest_more_than = manhattan_dist.sort_values().index[0]
closest = bin_more_than.loc[closest_more_than]
index_map[(mean_id, cov_id)] = tuple(closest.tolist()[:2])
# apply index_map to original df
raw_bin = df[["mean_bin", "cov_bin"]].set_index(["mean_bin", "cov_bin"])
raw_bin["use_mean"] = pd.Series(index_map).apply(lambda i: i[0])
raw_bin["use_cov"] = pd.Series(index_map).apply(lambda i: i[1])
df["mean_bin"] = raw_bin["use_mean"].values
df["cov_bin"] = raw_bin["use_cov"].values
# calculate bin mean and std, now disp_std_bin shouldn't have NAs
disp_grouped = df.groupby(["mean_bin", "cov_bin"])["dispersion"]
disp_mean_bin = disp_grouped.mean()
disp_std_bin = disp_grouped.std(ddof=1)
# actually do the normalization
_mean_norm = disp_mean_bin.loc[list(zip(df["mean_bin"], df["cov_bin"]))]
_std_norm = disp_std_bin.loc[list(zip(df["mean_bin"], df["cov_bin"]))]
df["dispersion_norm"] = (
df["dispersion"].values - _mean_norm.values # use values here as index differs
) / _std_norm.values
dispersion_norm = df["dispersion_norm"].values.astype("float32")
# Select n_top_feature
if n_top_feature is not None:
feature_subset = df.index.isin(
df.sort_values("dispersion_norm", ascending=False).index[:5000]
)
else:
max_disp = np.inf if max_disp is None else max_disp
dispersion_norm[np.isnan(dispersion_norm)] = 0 # similar to Seurat
feature_subset = np.logical_and.reduce(
(
mean > min_mean,
mean < max_mean,
dispersion_norm > min_disp,
dispersion_norm < max_disp,
)
)
df["feature_select"] = feature_subset
log.info(" finished")
return df
[docs]def determine_engine(dataset_paths):
def _single_path(path):
if pathlib.Path(f"{path}/.zgroup").exists():
e = "zarr"
else:
e = None # default for None is netcdf4 or xarray will guess
return e
def _multi_paths(paths):
engines = []
for path in paths:
e = _single_path(path)
engines.append(e)
_engine = list(set(engines))
if len(_engine) > 1:
raise ValueError(
f'Can not open a mixture of netcdf4 and zarr files in "{dataset_paths}", please use '
f'`allcools convert-mcds-to-zarr {{dataset_paths}}` to convert the storage of all MCDS files '
f'to zarr or netcdf. We recommend you use the zarr format, which gives better IO '
f'performance according to our experience.'
)
else:
_engine = _engine[0]
return _engine
if isinstance(dataset_paths, (str, pathlib.PosixPath)):
if "*" in str(dataset_paths):
engine = _multi_paths(list(glob.glob(dataset_paths)))
else:
# single mcds path
engine = _single_path(dataset_paths)
else:
if len(dataset_paths) == 1:
engine = _single_path(dataset_paths[0])
else:
engine = _multi_paths(dataset_paths)
return engine
[docs]def obj_to_str(ds, coord_dtypes=None):
if coord_dtypes is None:
coord_dtypes = {}
for k, v in ds.coords.items():
if np.issubdtype(v, np.object) or np.issubdtype(v, np.unicode):
if k in coord_dtypes:
ds.coords[k] = v.load().astype(coord_dtypes[k])
else:
ds.coords[k] = v.load().astype(str)
return
[docs]def write_ordered_chunks(
chunks_to_write,
final_path,
append_dim,
engine="zarr",
coord_dtypes=None,
dtype=None,
):
# some function may return None if the chunk is empty
chunks_to_write = {k: v for k, v in chunks_to_write.items() if v is not None}
# open all chunks to determine string coords dtype lengths
total_ds = xr.open_mfdataset(
list(chunks_to_write.values()),
concat_dim=append_dim,
combine="nested",
engine=engine,
)
obj_to_str(total_ds, coord_dtypes)
# string dtype is set to coord maximum length, so need to load all coords to determine
# otherwise the chunk writing will truncate string coords if the dtype length is wrong
coord_dtypes = {k: v.dtype for k, v in total_ds.coords.items()}
del total_ds
# write chunks
wrote = False
for chunk_i, output_path in sorted(chunks_to_write.items(), key=lambda _i: _i[0]):
# save chunk into the zarr
chunk_ds = xr.open_dataset(output_path, engine=engine).load()
obj_to_str(chunk_ds, coord_dtypes)
if dtype is not None:
chunk_ds = chunk_ds.astype(dtype)
if not wrote:
wrote = True
# create the new da
chunk_ds.to_zarr(final_path, mode="w")
else:
chunk_ds.to_zarr(final_path, mode="a", append_dim=append_dim)
return
[docs]def convert_to_zarr(paths):
"""Convert xarray.Dataset stored in other backends into zarr backend."""
def _convert_single_path(p):
if not pathlib.Path(p).exists():
raise FileNotFoundError(f'{p} not exist.')
tmp_p = f'{p}_convert_tmp'
if determine_engine(tmp_p) != 'zarr':
ds = xr.open_dataset(p)
# this will load the whole dataset
ds.to_zarr(tmp_p)
try:
subprocess.run(['mv', p, f'{p}_to_delete'],
check=True,
stderr=subprocess.PIPE,
encoding='utf8')
subprocess.run(['mv', tmp_p, p],
check=True,
stderr=subprocess.PIPE,
encoding='utf8')
subprocess.run(['rm', '-rf', f'{p}_to_delete'],
check=True,
stderr=subprocess.PIPE,
encoding='utf8')
except subprocess.CalledProcessError as e:
print(e.stderr)
raise e
return
if isinstance(paths, (str, pathlib.PosixPath)):
paths = str(paths)
if '*' in paths:
for path in glob.glob(paths):
_convert_single_path(path)
else:
_convert_single_path(paths)
else:
for path in paths:
convert_to_zarr(path)
return
[docs]def update_dataset_config(output_dir, add_ds_region_dim=None, change_region_dim=None, config=None,
add_ds_sample_dim=None):
# update RegionDS default dimension
try:
with open(f"{output_dir}/.ALLCools", "r") as f:
_config = yaml.load(f, yaml.SafeLoader)
except FileNotFoundError:
_config = {"region_dim": None, "ds_region_dim": {}, "ds_sample_dim": {}}
if config is not None:
_config.update(config)
if change_region_dim is not None:
_config["region_dim"] = change_region_dim
if add_ds_region_dim is not None:
_config["ds_region_dim"].update(add_ds_region_dim)
if add_ds_sample_dim is not None:
_config["ds_sample_dim"].update(add_ds_sample_dim)
with open(f"{output_dir}/.ALLCools", "w") as f:
yaml.dump(_config, f)
return