Source code for sodirac.utils

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 5/17/23 2:58 PM
# @Author  : Chang Xu
# @File    : utils.py
# @Email   : changxu@nus.edu.sg

import os
import sys
import random
import logging
from builtins import range
from typing import Union, Callable, Any, Iterable, List, Optional, Tuple, Sequence, Dict

import anndata
import numpy as np
import pandas as pd
import scanpy as sc
import sklearn
import torch
import tqdm
from scipy import sparse, stats
from sklearn.metrics import pairwise_distances, calinski_harabasz_score
from sklearn.metrics.pairwise import euclidean_distances
from sklearn.neighbors import NearestNeighbors, KNeighborsRegressor
from sklearn.preprocessing import normalize
from sklearn.neighbors import kneighbors_graph
from torch_geometric.nn import knn_graph, radius_graph

EdgeList = List[Tuple[int, int]]


[docs]def append_categorical_to_data( X: Union[np.ndarray, sparse.csr.csr_matrix], categorical: np.ndarray, ) -> Tuple[Union[np.ndarray, sparse.csr.csr_matrix], np.ndarray]: """ Append a one-hot encoding of a categorical vector to each sample in `X`. Parameters ---------- X : np.ndarray or sparse.csr_matrix Shape [cells, features]. Feature matrix. categorical : np.ndarray Shape [cells,]. Categorical labels per cell. Returns ------- Xa : np.ndarray or sparse.csr_matrix Shape [cells, features + n_categories]. Matrix with one-hot appended. categories : np.ndarray Shape [n_categories,]. Category names in the order used for one-hot. Examples -------- >>> X_aug, cats = append_categorical_to_data(X, adata.obs["batch"].values) Notes ----- Uses `pd.Categorical(...).codes` to derive integer label indices, then a one-hot encoding (via `make_one_hot`) that is concatenated to `X`. """ labels = pd.Categorical(categorical) idx = np.array(labels.codes) idx = torch.from_numpy(idx.astype("int32")).long() categories = np.array(labels.categories) one_hot_mat = make_one_hot(idx, C=len(categories)).numpy() assert X.shape[0] == one_hot_mat.shape[0], f"dims unequal at {X.shape[0]}, {one_hot_mat.shape[0]}" if sparse.issparse(X): X = sparse.hstack([X, one_hot_mat]) else: X = np.concatenate([X, one_hot_mat], axis=1) return X, categories
[docs]def get_adata_asarray( adata: anndata.AnnData, ) -> Union[np.ndarray, sparse.csr.csr_matrix]: """ Materialize `adata.X` as an array or CSR matrix (no view). Parameters ---------- adata : anndata.AnnData AnnData object with `.X` of shape [cells, genes]. Returns ------- X : np.ndarray or sparse.csr_matrix Concrete in-memory copy of `adata.X` with matching type. Notes ----- Preserves the dense/sparse form of the original `.X`. """ if sparse.issparse(adata.X): X = sparse.csr.csr_matrix(adata.X) else: X = np.array(adata.X) return X
[docs]def build_classification_matrix( X: Union[np.ndarray, sparse.csr.csr_matrix], model_genes: np.ndarray, sample_genes: np.ndarray, gene_batch_size: int = 512, ) -> Union[np.ndarray, sparse.csr.csr_matrix]: """ Reindex a count matrix to the model's gene order, filling missing genes with zeros. Parameters ---------- X : np.ndarray or sparse.csr_matrix Shape [cells, genes]. Count matrix for the sample. model_genes : np.ndarray Expected gene identifiers in model order. sample_genes : np.ndarray Gene identifiers (columns) for `X`. gene_batch_size : int, default 512 Number of genes to copy per batch (speed vs. memory trade-off). Returns ------- N : np.ndarray or sparse.csr_matrix Shape [cells, len(model_genes)]. Reindexed counts; zeros for absent genes. Notes ----- If `model_genes` exactly matches `sample_genes`, returns `X` unchanged. Otherwise, constructs a new matrix with columns in model order and copies overlapping genes in batches to control memory usage. """ if type(X) not in (np.ndarray, sparse.csr.csr_matrix): raise TypeError(f"X is type {type(X)}, must be `np.ndarray` or `sparse.csr_matrix`") n_cells = X.shape[0] if len(model_genes) == len(sample_genes) and np.all(model_genes == sample_genes): print("Gene names match exactly, returning input.") return X if isinstance(X, np.ndarray): N = np.zeros((n_cells, len(model_genes))) else: N = sparse.lil_matrix((n_cells, len(model_genes))) model_genes_indices = [] sample_genes_indices = [] common_genes = 0 for i, g in tqdm.tqdm(enumerate(sample_genes), desc="mapping genes"): if np.sum(g == model_genes) > 0: model_genes_indices.append(int(np.where(g == model_genes)[0])) sample_genes_indices.append(i) common_genes += 1 gene_idx = 0 n_batches = int(np.ceil(len(model_genes_indices) / gene_batch_size)) for _ in tqdm.tqdm(range(n_batches), desc="copying gene batches"): model_batch_idx = model_genes_indices[gene_idx: gene_idx + gene_batch_size] sample_batch_idx = sample_genes_indices[gene_idx: gene_idx + gene_batch_size] if len(model_batch_idx) == 0: break N[:, model_batch_idx] = X[:, sample_batch_idx] gene_idx += gene_batch_size if sparse.issparse(N): N = sparse.csr_matrix(N) print(f"Found {common_genes} common genes.") return N
[docs]def knn_smooth_pred_class( X: np.ndarray, pred_class: np.ndarray, grouping: Optional[np.ndarray] = None, k: int = 15, ) -> np.ndarray: """ Smooth class labels by majority vote among k-nearest neighbors. Parameters ---------- X : np.ndarray Shape [N, features]. Embedding used for neighbor search. pred_class : np.ndarray Shape [N,]. Class labels to be smoothed. grouping : np.ndarray, optional Shape [N,]. Group IDs restricting neighbors to within-group only. If None, all cells are considered a single group. k : int, default 15 Number of neighbors to use. Returns ------- smooth_pred_class : np.ndarray Shape [N,]. Smoothed class labels. Notes ----- For each group (or globally), builds a kNN graph and assigns to each cell the majority class among its k neighbors (including or excluding itself depending on scikit-learn defaults used here). """ if grouping is None: grouping = np.zeros(X.shape[0]) smooth_pred_class = np.zeros_like(pred_class) for group in np.unique(grouping): group_idx = np.where(grouping == group)[0].astype("int") X_group = X[grouping == group, :] k_use = min(k, X_group.shape[0]) nns = NearestNeighbors(n_neighbors=k_use).fit(X_group) _, idx = nns.kneighbors(X_group) for i in range(X_group.shape[0]): classes = pred_class[group_idx[idx[i, :]]] uniq_classes, counts = np.unique(classes, return_counts=True) maj_class = uniq_classes[int(np.argmax(counts))] smooth_pred_class[group_idx[i]] = maj_class return smooth_pred_class
[docs]def knn_smooth_pred_class_prob( X: np.ndarray, pred_probs: np.ndarray, names: np.ndarray, grouping: Optional[np.ndarray] = None, k: Union[Callable[[int], int], int] = 15, dm: Optional[np.ndarray] = None, **kwargs: Any, ) -> np.ndarray: """ Smooth class probabilities by kNN regression with RBF distance weights. Parameters ---------- X : np.ndarray Shape [N, features]. Embedding used for neighbor search. pred_probs : np.ndarray Shape [N, C]. Class prediction probabilities per cell. names : np.ndarray Shape [C,]. Class names corresponding to columns of `pred_probs`. grouping : np.ndarray, optional Shape [N,]. Group IDs restricting neighbors to within-group only. If None, all cells are considered a single group. k : Callable[[int], int] or int, default 15 If callable, receives the group size and returns k for that group; otherwise a fixed k is used. dm : np.ndarray, optional Shape [N, N]. Precomputed distance matrix to set the RBF kernel parameter efficiently. **kwargs : Any Additional kwargs forwarded to `KNeighborsRegressor`. Returns ------- smooth_pred_class : np.ndarray Shape [N,]. Class labels from argmax of smoothed probabilities. Examples -------- >>> smooth = knn_smooth_pred_class_prob(X, probs, class_names, grouping=clusters, k=15) Notes ----- Uses `RBFWeight` to set kernel width from median pairwise distance, then applies weighted kNN regression to smooth class probabilities within each group. Class labels are taken as argmax of the smoothed probabilities. """ if grouping is None: grouping = np.zeros(X.shape[0]) smooth_pred_probs = np.zeros_like(pred_probs) smooth_pred_class = np.zeros(pred_probs.shape[0], dtype="object") for group in np.unique(grouping): group_idx = np.where(grouping == group)[0].astype("int") X_group = X[grouping == group, :] y_group = pred_probs[grouping == group, :] k_use = k(X_group.shape[0]) if callable(k) else k k_use = min(k_use, X_group.shape[0]) rbf = RBFWeight() rbf.set_alpha(X=X_group, n_max=None, dm=dm) if "dm" in kwargs: del kwargs["dm"] nns = KNeighborsRegressor( n_neighbors=k_use, weights=rbf, **kwargs, ).fit(X_group, y_group) smoothed_probs = nns.predict(X_group) smooth_pred_probs[group_idx, :] = smoothed_probs g_classes = names[np.argmax(smoothed_probs, axis=1)] smooth_pred_class[group_idx] = g_classes return smooth_pred_class
[docs]def argmax_pred_class( grouping: np.ndarray, prediction: np.ndarray, ) -> np.ndarray: """ Assign groupwise majority class to all elements in each group. Parameters ---------- grouping : np.ndarray Shape [N,]. Group IDs for each element. prediction : np.ndarray Shape [N,]. Predicted class for each element. Returns ------- assigned_classes : np.ndarray Shape [N,]. Majority class per group applied to all elements. Examples -------- >>> grouping = np.array([0,0,0,1,1,1,2,2,2,2]) >>> prediction = np.array(['A','A','A','B','A','B','C','A','B','C']) >>> argmax_pred_class(grouping, prediction) array(['A','A','A','B','B','B','C','C','C','C'], dtype=object) Notes ----- Useful when leveraging cluster assignments from another method to simplify cell-level labels to cluster-level majorities. """ assert grouping.shape[0] == prediction.shape[0], "`grouping` and `prediction` must be the same length" groups = sorted(list(set(grouping.tolist()))) assigned_classes = np.zeros(grouping.shape[0], dtype="object") for group in groups: classes, counts = np.unique(prediction[grouping == group], return_counts=True) majority_class = classes[np.argmax(counts)] assigned_classes[grouping == group] = majority_class return assigned_classes
[docs]def compute_entropy_of_mixing( X: np.ndarray, y: np.ndarray, n_neighbors: int, n_iters: Optional[int] = None, **kwargs: Any, ) -> np.ndarray: """ Compute entropy of group mixing in local neighborhoods. Parameters ---------- X : np.ndarray Shape [N, P]. Feature matrix used for neighbor search. y : np.ndarray Shape [N,]. Discrete group labels. n_neighbors : int Number of neighbors drawn when computing local distributions. n_iters : int, optional Number of random query points to evaluate. If None, uses all points. **kwargs : Any Additional keyword arguments forwarded to `NearestNeighbors`. Returns ------- entropy_of_mixing : np.ndarray Shape [n_iters,]. Entropy values per query point (in nats). Notes ----- For each query point, counts group membership among its k-nearest neighbors and computes entropy of the resulting probability vector. """ n_neighbors = min(n_neighbors, X.shape[0]) nn = NearestNeighbors(n_neighbors=n_neighbors, metric="euclidean", **kwargs).fit(X) nn_idx = nn.kneighbors(return_distance=False) if n_iters is not None: n_iters = min(n_iters, X.shape[0]) if (n_iters is None) or (n_iters == X.shape[0]): query_points = np.arange(X.shape[0]) else: assert n_iters < X.shape[0] query_points = np.random.choice(X.shape[0], size=n_iters, replace=False) entropy_of_mixing = np.zeros(len(query_points)) for i, ridx in enumerate(query_points): nn_y = y[nn_idx[ridx, :]] uniques = np.unique(y) nn_y_p = np.zeros(len(uniques)) for j, v in enumerate(uniques): nn_y_p[j] = np.sum(nn_y == v) nn_y_p = nn_y_p / nn_y_p.sum() H = stats.entropy(nn_y_p) entropy_of_mixing[i] = H return entropy_of_mixing
[docs]def pp_adatas( adata_sc: anndata.AnnData, adata_sp: anndata.AnnData, genes: Optional[Iterable[str]] = None, gene_to_lowercase: bool = True, ) -> None: """ Preprocess single-cell and spatial AnnData to align genes and compute density priors. Parameters ---------- adata_sc : anndata.AnnData Single-cell AnnData. adata_sp : anndata.AnnData Spatial expression AnnData. genes : Iterable[str], optional Marker genes to use. If None, all genes from `adata_sc` are considered. gene_to_lowercase : bool, default True If True, lowercases all gene names to align case between modalities. Returns ------- None Notes ----- - Filters out all-zero genes in both datasets. - Stores shared training genes in `.uns["training_genes"]`. - Stores overlapping genes in `.uns["overlap_genes"]`. - Computes uniform and RNA-count-based density priors in `adata_sp.obs`. """ sc.pp.filter_genes(adata_sc, min_cells=1) sc.pp.filter_genes(adata_sp, min_cells=1) if genes is None: genes = adata_sc.var.index if gene_to_lowercase: adata_sc.var.index = [g.lower() for g in adata_sc.var.index] adata_sp.var.index = [g.lower() for g in adata_sp.var.index] genes = [g.lower() for g in genes] adata_sc.var_names_make_unique() adata_sp.var_names_make_unique() genes = list(set(genes) & set(adata_sc.var.index) & set(adata_sp.var.index)) adata_sc.uns["training_genes"] = genes adata_sp.uns["training_genes"] = genes logging.info("%d training genes saved to `.uns['training_genes']`.", len(genes)) overlap_genes = list(set(adata_sc.var.index) & set(adata_sp.var.index)) adata_sc.uns["overlap_genes"] = overlap_genes adata_sp.uns["overlap_genes"] = overlap_genes logging.info("%d overlap genes saved to `.uns['overlap_genes']`.", len(overlap_genes)) adata_sp.obs["uniform_density"] = np.ones(adata_sp.X.shape[0]) / adata_sp.X.shape[0] logging.info("Uniform density prior written to `adata_sp.obs['uniform_density']`.") rna_count_per_spot = np.array(adata_sp.X.sum(axis=1)).squeeze() adata_sp.obs["rna_count_based_density"] = rna_count_per_spot / np.sum(rna_count_per_spot) logging.info("RNA count-based density prior written to `adata_sp.obs['rna_count_based_density']`.")
[docs]class RBFWeight(object): """ Radial basis function (Gaussian) weight generator for distances. Parameters ---------- alpha : float, optional RBF parameter (1 / (2 * sigma^2)). If not set, must call `set_alpha`. Notes ----- Weights follow: w(r) = exp(- (alpha * r)^2 ). """ def __init__(self, alpha: Optional[float] = None) -> None: self.alpha = alpha
[docs] def set_alpha( self, X: np.ndarray, n_max: Optional[int] = None, dm: Optional[np.ndarray] = None, ) -> None: """ Estimate `alpha` from the median pairwise distance. Parameters ---------- X : np.ndarray Shape [N, P]. Observations. n_max : int, optional Max observations to subsample for the median distance computation. dm : np.ndarray, optional Shape [N, N]. Precomputed distance matrix; if provided, speeds up estimation. Returns ------- None References ---------- Gretton et al., "A Kernel Two-Sample Test", JMLR 13(Mar):723–773, 2012. """ if n_max is None: n_max = X.shape[0] if dm is None: if X.shape[0] > n_max: ridx = np.random.choice(X.shape[0], size=n_max, replace=False) X_p = X[ridx, :] else: X_p = X dm = euclidean_distances(X_p) upper = dm[np.triu_indices_from(dm, k=1)] sigma = np.median(upper, overwrite_input=True) self.alpha = 1.0 / (2 * (sigma ** 2))
def __call__(self, distances: np.ndarray) -> np.ndarray: """ Compute RBF weights for given distances. Parameters ---------- distances : np.ndarray Shape [N,]. Distances to weight. Returns ------- weights : np.ndarray Shape [N,]. RBF weights. Notes ----- Requires `self.alpha` to be set via constructor or `set_alpha`. """ if self.alpha is None: msg = "must set `alpha` attribute before computing weights.\n" msg += "use `.set_alpha()` method to estimate from data." raise ValueError(msg) weights = np.exp(-((self.alpha * distances) ** 2)) return weights
[docs]def adata_to_cluster_expression( adata: anndata.AnnData, cluster_label: str, scale: bool = True, add_density: bool = True, ) -> anndata.AnnData: """ Aggregate single-cell expression to cluster-level expression by `cluster_label`. Parameters ---------- adata : anndata.AnnData Single-cell AnnData. cluster_label : str Column in `adata.obs` defining clusters. scale : bool, default True If True, sums counts per cluster (proportional to cluster size). If False, takes mean per cluster. add_density : bool, default True If True, adds normalized cluster sizes to `.obs['cluster_density']`. Returns ------- aggregated : anndata.AnnData AnnData with one observation per cluster and the same variables as input. Notes ----- Only `cluster_label` is preserved in `.obs` of the returned AnnData (plus `cluster_density` if requested). """ try: value_counts = adata.obs[cluster_label].value_counts(normalize=True) except KeyError as e: raise ValueError("Provided label must belong to adata.obs.") from e unique_labels = value_counts.index new_obs = pd.DataFrame({cluster_label: unique_labels}) adata_ret = sc.AnnData(obs=new_obs, var=adata.var, uns=adata.uns) X_new = np.empty((len(unique_labels), adata.shape[1])) for index, l in enumerate(unique_labels): if not scale: X_new[index] = adata[adata.obs[cluster_label] == l].X.mean(axis=0) else: X_new[index] = adata[adata.obs[cluster_label] == l].X.sum(axis=0) adata_ret.X = X_new if add_density: adata_ret.obs["cluster_density"] = adata_ret.obs[cluster_label].map(lambda i: value_counts[i]) return adata_ret
[docs]def _edge_list_to_tensor(edge_list: Sequence[Sequence[int]]) -> torch.LongTensor: """ Convert a list of directed edges to a tensor of shape [2, E]. Parameters ---------- edge_list : sequence of (i, j) List-like edge pairs. Returns ------- edge_index : torch.LongTensor Shape [2, E]. Empty [2, 0] if input is empty. Notes ----- Validates shape and dtype; does not deduplicate edges. """ if edge_list is None or len(edge_list) == 0: return torch.empty((2, 0), dtype=torch.long) arr = np.asarray(edge_list, dtype=np.int64) if arr.ndim != 2 or arr.shape[1] != 2: raise ValueError("edge_list must be of shape [E, 2].") return torch.from_numpy(arr).long().t()
[docs]def _to_tuple_list(edges: Union[EdgeList, torch.Tensor, np.ndarray]) -> EdgeList: """ Normalize various edge representations to a list of (i, j) tuples. Parameters ---------- edges : list/array/tensor List of pairs [[i, j], ...] / [(i, j), ...] or a 2D tensor/array of shape [2, E] or [E, 2]. Returns ------- edge_list : List[Tuple[int, int]] List of directed edge tuples. Notes ----- Accepts torch or numpy arrays in either [2, E] or [E, 2] layout. """ if isinstance(edges, torch.Tensor): if edges.ndim != 2: raise ValueError("edge tensor must be 2D.") if edges.shape[0] == 2: arr = edges.t().detach().cpu().numpy() elif edges.shape[1] == 2: arr = edges.detach().cpu().numpy() else: raise ValueError("edge tensor must be [2, E] or [E, 2].") return [(int(i), int(j)) for i, j in arr] if isinstance(edges, np.ndarray): if edges.ndim != 2: raise ValueError("edge array must be 2D.") if edges.shape[0] == 2: arr = edges.T elif edges.shape[1] == 2: arr = edges else: raise ValueError("edge array must be [2, E] or [E, 2].") return [(int(i), int(j)) for i, j in arr] return [(int(e[0]), int(e[1])) for e in edges]
[docs]def get_multi_edge_index( pos: np.ndarray, regions: np.ndarray, graph_methods: str = "knn", n_neighbors: Optional[int] = None, n_radius: Optional[float] = None, verbose: bool = True, ) -> torch.LongTensor: """ Build intra-region graphs (no cross-region edges) and merge them. Parameters ---------- pos : np.ndarray Shape [N, d]. Coordinates. regions : np.ndarray Shape [N,]. Region label for each node. graph_methods : {"knn", "radius"}, default "knn" Graph construction method. n_neighbors : int, optional Required if `graph_methods == "knn"`. Number of neighbors (> 0). n_radius : float, optional Required if `graph_methods == "radius"`. Neighborhood radius (> 0). verbose : bool, default True If True, prints average directed neighbors per node. Returns ------- edge_index : torch.LongTensor Shape [2, E]. Directed edges (i, j). Empty [2, 0] if none. Notes ----- Uses PyG `knn_graph` or `radius_graph` per region and remaps indices to global node IDs before concatenation. """ if pos.shape[0] != regions.shape[0]: raise ValueError("pos and regions must have the same length") if graph_methods not in ["knn", "radius"]: raise ValueError("graph_methods must be either 'knn' or 'radius'") if graph_methods == "knn" and (n_neighbors is None or n_neighbors <= 0): raise ValueError("n_neighbors must be a positive integer for knn method") if graph_methods == "radius" and (n_radius is None or n_radius <= 0): raise ValueError("n_radius must be a positive value for radius method") edge_list = [] regions_unique = np.unique(regions) for reg in regions_unique: locs = np.where(regions == reg)[0] pos_region = pos[locs, :] if graph_methods == "knn": edge_index = knn_graph( torch.Tensor(pos_region), k=n_neighbors, batch=torch.LongTensor(np.zeros(pos_region.shape[0])), loop=True, ) elif graph_methods == "radius": edge_index = radius_graph( torch.Tensor(pos_region), r=n_radius, batch=torch.LongTensor(np.zeros(pos_region.shape[0])), loop=True, ) for (i, j) in zip(edge_index[1].numpy(), edge_index[0].numpy()): edge_list.append([locs[i], locs[j]]) if verbose: N = pos.shape[0] E = len(edge_list) avg = (E / N) if N > 0 else 0.0 print(f"Average neighbors per node (directed): {avg:.2f} (edges={E}, nodes={N})") if len(edge_list) == 0: return torch.empty((2, 0), dtype=torch.long) return torch.LongTensor(edge_list).T
[docs]def get_single_edge_index( pos: np.ndarray, graph_methods: str = "knn", n_neighbors: Optional[int] = None, n_radius: Optional[float] = None, verbose: bool = True, ) -> torch.LongTensor: """ Build a graph on a single region or the whole set. Parameters ---------- pos : np.ndarray Shape [N, d]. Coordinates. graph_methods : {"knn", "radius"}, default "knn" Graph construction method. n_neighbors : int, optional Required if `graph_methods == "knn"`. Number of neighbors (> 0). n_radius : float, optional Required if `graph_methods == "radius"`. Neighborhood radius (> 0). verbose : bool, default True If True, prints average directed neighbors per node. Returns ------- edge_index : torch.LongTensor Shape [2, E]. Directed edges (i, j). Empty [2, 0] if none. """ if graph_methods not in ["knn", "radius"]: raise ValueError("graph_methods must be either 'knn' or 'radius'") if graph_methods == "knn" and (n_neighbors is None or n_neighbors <= 0): raise ValueError("n_neighbors must be a positive integer for knn method") if graph_methods == "radius" and (n_radius is None or n_radius <= 0): raise ValueError("n_radius must be a positive value for radius method") edge_list = [] if graph_methods == "knn": edge_index = knn_graph( torch.Tensor(pos), k=n_neighbors, batch=torch.LongTensor(np.zeros(pos.shape[0])), loop=False, ) elif graph_methods == "radius": edge_index = radius_graph( torch.Tensor(pos), r=n_radius, batch=torch.LongTensor(np.zeros(pos.shape[0])), loop=False, ) for (i, j) in zip(edge_index[1].numpy(), edge_index[0].numpy()): edge_list.append([i, j]) if verbose: N = pos.shape[0] E = len(edge_list) avg = (E / N) if N > 0 else 0.0 print(f"Average neighbors per node (directed): {avg:.2f} (edges={E}, nodes={N})") if len(edge_list) == 0: return torch.empty((2, 0), dtype=torch.long) return torch.LongTensor(edge_list).T
[docs]def get_expr_edge_index( expr: np.ndarray, n_neighbors: int = 20, mode: str = "connectivity", metric: str = "correlation", include_self: bool = False, ) -> List[Tuple[int, int]]: """ Build a kNN graph from a feature/expression matrix using scikit-learn. Parameters ---------- expr : np.ndarray Shape [N, P]. Feature matrix. n_neighbors : int, default 20 Number of neighbors. mode : {"connectivity", "distance"}, default "connectivity" Graph construction mode. metric : str, default "correlation" Distance/affinity metric passed to `kneighbors_graph`. include_self : bool, default False Whether to include self-edges. Returns ------- edges : list of tuple Directed edges (row -> col) as a list of (i, j) pairs. Notes ----- Returns COO order from the sparse adjacency. """ adj = kneighbors_graph( expr, n_neighbors, mode=mode, metric=metric, include_self=include_self, ) edge_list = list(zip(adj.tocoo().row, adj.tocoo().col)) return edge_list
[docs]def edge_lists_intersection(edges1: Union[EdgeList, torch.Tensor], edges2: Union[EdgeList, torch.Tensor]) -> List[Tuple[int, int]]: """ Compute the direction-sensitive intersection of two edge sets. Parameters ---------- edges1 : list of (i, j) or torch.LongTensor First edge set. edges2 : list of (i, j) or torch.LongTensor Second edge set. Returns ------- edges : list of tuple Intersection as a list of directed edges. Notes ----- Converts inputs to tuple lists, then intersects as Python sets. """ def to_tuple_list(edges): if isinstance(edges, torch.Tensor): if edges.ndim != 2 or edges.shape[0] != 2: raise ValueError("Edge tensor must have shape [2, E].") arr = edges.t().cpu().numpy() return [tuple(map(int, e)) for e in arr] return [tuple(e) for e in edges] set1 = set(to_tuple_list(edges1)) set2 = set(to_tuple_list(edges2)) return list(set1 & set2)
[docs]def get_consensus_edges( spatial: np.ndarray, *omics: np.ndarray, target_neighbors: int = 8, max_iter: int = 20, ) -> torch.LongTensor: """ Intersect spatial and feature kNN graphs to target a desired average degree. Parameters ---------- spatial : np.ndarray Shape [N, d]. Coordinates. *omics : np.ndarray One or more matrices, each shape [N, p_k], concatenated for feature kNN. target_neighbors : int, default 8 Desired average number of neighbors in the intersection (0 < target < N). max_iter : int, default 20 Maximum number of binary-search iterations. Returns ------- edge_index : torch.LongTensor Shape [2, E]. Intersection edges (i, j). Empty [2, 0] if none. Notes ----- Binary-searches the neighbor count used for both spatial and feature graphs until the intersection's average degree approaches `target_neighbors`. """ n_cells = spatial.shape[0] assert all(omic.shape[0] == n_cells for omic in omics), "Omics data dimension mismatch" assert 0 < target_neighbors < n_cells, "Invalid target_neighbors" low, high = 4, min(80, n_cells - 1) combined_omics = np.hstack(omics) for _ in range(max_iter): n_neighbors = (low + high) // 2 edge_index_spatial = get_single_edge_index(spatial, n_neighbors=n_neighbors) edge_index_feat = get_expr_edge_index(combined_omics, n_neighbors=n_neighbors) edge_index_intersect = edge_lists_intersection(edge_index_spatial, edge_index_feat) avg_neighbors = len(edge_index_intersect) / n_cells if abs(avg_neighbors - target_neighbors) < 0.5: break elif avg_neighbors < target_neighbors: low = n_neighbors + 1 else: high = n_neighbors - 1 if low >= high: break edge_index = torch.LongTensor(edge_index_intersect).T if len(edge_index_intersect) > 0 \ else torch.empty((2, 0), dtype=torch.long) return edge_index
[docs]def tfidf(X: Union[np.ndarray, sparse.csr_matrix]) -> Union[np.ndarray, sparse.csr_matrix]: """ Apply TF-IDF normalization (Seurat v3-style). Parameters ---------- X : np.ndarray or sparse.csr_matrix Input matrix of shape [cells, features]. Returns ------- X_tfidf : np.ndarray or sparse.csr_matrix TF-IDF normalized matrix with the same sparsity type as input. Notes ----- idf = n_cells / feature_sum; tf is row-normalized counts. """ idf = X.shape[0] / X.sum(axis=0) if sparse.issparse(X): tf = X.multiply(1 / X.sum(axis=1)) return tf.multiply(idf) else: tf = X / X.sum(axis=1, keepdims=True) return tf * idf
[docs]def lsi( adata: anndata.AnnData, n_comps: int = 20, use_highly_variable: Optional[bool] = None, **kwargs: Any, ) -> anndata.AnnData: """ Compute LSI embeddings following the Seurat v3 approach. Parameters ---------- adata : anndata.AnnData Input AnnData. n_comps : int, default 20 Number of LSI dimensions. use_highly_variable : bool, optional If None, uses HVGs if available; otherwise all genes. **kwargs : Any Additional kwargs forwarded to `sklearn.utils.extmath.randomized_svd`. Returns ------- adata : anndata.AnnData Input AnnData with `adata.obsm["X_lsi"]` added. Notes ----- Applies TF-IDF, L1 normalization, log1p scaling, randomized SVD, and per-cell z-scoring (mean 0, std 1). """ if "random_state" not in kwargs: kwargs["random_state"] = 0 if use_highly_variable is None: use_highly_variable = "highly_variable" in adata.var adata_use = adata[:, adata.var["highly_variable"]] if use_highly_variable else adata X = tfidf(adata_use.X) X_norm = normalize(X, norm="l1") X_norm = np.log1p(X_norm * 1e4) X_lsi = sklearn.utils.extmath.randomized_svd(X_norm, n_comps, **kwargs)[0] X_lsi -= X_lsi.mean(axis=1, keepdims=True) X_lsi /= X_lsi.std(axis=1, ddof=1, keepdims=True) adata.obsm["X_lsi"] = X_lsi return adata
[docs]def _optimize_cluster( adata: anndata.AnnData, resolution: List[float] = list(np.arange(0.01, 2.5, 0.01)), ) -> float: """ Optimize Leiden resolution by maximizing the Calinski–Harabasz score. Parameters ---------- adata : anndata.AnnData AnnData whose `.X` (samples × features) is used for scoring. This function overwrites `adata.obs["leiden"]` during the search. resolution : list of float, default np.arange(0.01, 2.5, 0.01) Candidate resolutions. Returns ------- res : float Best resolution (prints it as well). Notes ----- Runs `sc.tl.leiden` for each resolution and computes CH score on `.X`. """ scores = [] for r in resolution: sc.tl.leiden(adata, resolution=r, flavor="igraph", n_iterations=2, directed=False) s = calinski_harabasz_score(adata.X, adata.obs["leiden"]) scores.append(s) cl_opt_df = pd.DataFrame({"resolution": resolution, "score": scores}) best_idx = int(np.argmax(cl_opt_df["score"])) res = float(cl_opt_df.iloc[best_idx, 0]) print("Best resolution: ", res) return res
[docs]def _priori_cluster( adata: anndata.AnnData, eval_cluster_n: int = 7, res_min: float = 0.01, res_max: float = 2.5, res_step: float = 0.01, ) -> float: """ Find a Leiden resolution that yields a target number of clusters. Parameters ---------- adata : anndata.AnnData AnnData to be clustered. Overwrites `adata.obs["leiden"]`. eval_cluster_n : int, default 7 Desired number of Leiden clusters. res_min : float, default 0.01 Minimum resolution to try (inclusive). res_max : float, default 2.5 Maximum resolution to try (inclusive). res_step : float, default 0.01 Step size between consecutive resolutions. Returns ------- res : float Resolution that first (from high to low) yields exactly `eval_cluster_n`, or the last tried value if none match. Notes ----- Search order is descending to mirror original behavior. """ if res_step <= 0: raise ValueError("res_step must be > 0") if res_max < res_min: raise ValueError("res_max must be >= res_min") resolutions = np.arange(res_min, res_max, res_step) if resolutions.size == 0: raise ValueError("Empty resolution grid. Check res_min/res_max/res_step.") for res in sorted(resolutions.tolist(), reverse=True): sc.tl.leiden(adata, resolution=res, flavor="igraph", n_iterations=2, directed=False) count_unique_leiden = len(pd.DataFrame(adata.obs['leiden']).leiden.unique()) if count_unique_leiden == int(eval_cluster_n): break print("Best resolution: ", res) return res
[docs]def mclust_R( adata: anndata.AnnData, num_cluster: int, modelNames: str = 'EEE', used_obsm: str = "DIRAC_embed", random_seed: int = 2020, key_added: str = "DIRAC", ) -> None: """ Cluster embeddings using R's `mclust` via `rpy2`. Parameters ---------- adata : anndata.AnnData AnnData with embeddings in `adata.obsm[used_obsm]` of shape [N, d]. num_cluster : int Number of clusters for `Mclust`. modelNames : str, default 'EEE' Covariance model string (see mclust docs). used_obsm : str, default 'DIRAC_embed' Key in `adata.obsm` to cluster. random_seed : int, default 2020 Random seed for NumPy and R. key_added : str, default 'DIRAC' Column name to store cluster labels in `adata.obs`. Returns ------- None Notes ----- Requires `rpy2` and R package `mclust`. Writes integer labels to `adata.obs[key_added]` as categorical. """ np.random.seed(random_seed) import rpy2.robjects as robjects robjects.r.library("mclust") import rpy2.robjects.numpy2ri rpy2.robjects.numpy2ri.activate() r_random_seed = robjects.r['set.seed'] r_random_seed(random_seed) rmclust = robjects.r['Mclust'] res = rmclust(rpy2.robjects.numpy2ri.numpy2rpy(adata.obsm[used_obsm]), num_cluster, modelNames) mclust_res = np.array(res[-2]) adata.obs[key_added] = mclust_res adata.obs[key_added] = adata.obs[key_added].astype('int') adata.obs[key_added] = adata.obs[key_added].astype('category')
[docs]def seed_torch(seed: int = 1029) -> None: """ Set random seeds for Python, NumPy, and PyTorch (CPU & CUDA) for reproducibility. Parameters ---------- seed : int, default 1029 Seed value used across Python's `random`, NumPy, and PyTorch. Returns ------- None Notes ----- Sets: - os.environ['PYTHONHASHSEED'] - torch.manual_seed / cuda.manual_seed / cuda.manual_seed_all - torch.backends.cudnn.benchmark = False - torch.backends.cudnn.deterministic = True """ random.seed(seed) os.environ['PYTHONHASHSEED'] = str(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) torch.backends.cudnn.benchmark = False torch.backends.cudnn.deterministic = True
[docs]def combine_multimodal_adatas( adatas: Dict[str, anndata.AnnData], *, prefixes: Optional[Dict[str, str]] = None, align_obs: bool = True, preserve_obsm: Iterable[str] = ("spatial",), dtype: np.dtype = np.float32, ) -> anndata.AnnData: """ Concatenate features across modalities for the same set of cells. Parameters ---------- adatas : Dict[str, AnnData] Mapping from modality name (e.g., "RNA", "ATAC", "ADT") to AnnData. Dict insertion order determines feature block order. prefixes : Dict[str, str], optional Per-modality prefixes for feature names (default: f"{mod.upper()}_"). align_obs : bool, default True If True, reindex each AnnData to match the first modality's cells. If False, raises on mismatch. preserve_obsm : Iterable[str], default ("spatial",) Keys in `.obsm` to copy from the first modality if present. dtype : np.dtype, default np.float32 Output matrix dtype. Returns ------- combined : anndata.AnnData AnnData with dense `X` (features concatenated), `.var` describing feature types and original names, `.obs` from the reference modality, and `.uns` with combination metadata. Notes ----- Converts all blocks to dense arrays before horizontal concatenation. """ if not adatas: raise ValueError("`adatas` is empty. Provide at least one AnnData.") modalities = list(adatas.keys()) ref_key = modalities[0] ref = adatas[ref_key] ref_index = ref.obs.index for k, ad in list(adatas.items()): if align_obs: if not ref_index.equals(ad.obs.index): try: adatas[k] = ad[ref_index].copy() except KeyError: raise ValueError( f"Cell sets differ between '{ref_key}' and '{k}'. Cannot align." ) from None else: if not ref_index.equals(ad.obs.index): raise ValueError( f"Observation names between '{ref_key}' and '{k}' do not match in the same order." ) dense_blocks = [] var_index = [] feature_types = [] original_features = [] for mod in modalities: ad = adatas[mod] X = ad.X if hasattr(X, "toarray"): X = X.toarray() X = np.asarray(X, dtype=dtype, order="C") dense_blocks.append(X) pref = (prefixes or {}).get(mod, f"{mod.upper()}_") names = [f"{pref}{n}" for n in ad.var_names.astype(str)] var_index.extend(names) feature_types.extend([mod] * ad.n_vars) original_features.extend(ad.var_names.astype(str).tolist()) combined_X = np.hstack(dense_blocks).astype(dtype, copy=False) combined_var = pd.DataFrame( { "feature_types": feature_types, "original_feature": original_features, }, index=pd.Index(var_index, name="feature"), ) combined = anndata.AnnData( X=combined_X, obs=adatas[ref_key].obs.copy(), var=combined_var, uns={ "modality_combine": "+".join(modalities), "modalities": {m: adatas[m].n_vars for m in modalities}, }, ) for key in preserve_obsm: if key in ref.obsm: combined.obsm[key] = ref.obsm[key].copy() return combined
[docs]def ctg( adata_sc: anndata.AnnData, cluster_label: str, n_genes: int = 150, *, min_cells: int = 3, method: str = "wilcoxon", use_raw: bool = False, ) -> List[str]: """ Select top marker genes per cluster using Scanpy's `rank_genes_groups`. Parameters ---------- adata_sc : anndata.AnnData Single-cell AnnData with expression matrix and metadata. cluster_label : str Column in `adata_sc.obs` with cluster identities. n_genes : int, default 150 Number of top-ranked genes to collect per cluster before de-duplication. min_cells : int, keyword-only, default 3 Minimum cells a gene must be expressed in prior to ranking. method : {"wilcoxon", "t-test", "logreg"}, keyword-only, default "wilcoxon" Statistical test for differential expression. use_raw : bool, keyword-only, default False Whether to use `adata.raw` for testing. Returns ------- markers : List[str] Unique list of top marker genes across clusters. Notes ----- Works on a copy of the input AnnData to avoid in-place modifications. Applies minimal preprocessing (filter, normalize_total, log1p) before DE. """ if cluster_label not in adata_sc.obs.columns: raise ValueError( f"`cluster_label` '{cluster_label}' not found in adata_sc.obs. " f"Available columns: {list(adata_sc.obs.columns)}" ) if n_genes <= 0: raise ValueError("`n_genes` must be a positive integer.") if method not in {"wilcoxon", "t-test", "logreg"}: raise ValueError("`method` must be one of: 'wilcoxon', 't-test', 'logreg'.") adata = adata_sc.copy() sc.pp.filter_genes(adata, min_cells=min_cells) sc.pp.normalize_total(adata) sc.pp.log1p(adata) sc.tl.rank_genes_groups( adata, groupby=cluster_label, method=method, use_raw=use_raw, ) names = adata.uns["rank_genes_groups"]["names"] markers_df = pd.DataFrame(names) top_n = min(n_genes, markers_df.shape[0]) top_df = markers_df.iloc[:top_n, :] unique_markers = list(np.unique(top_df.to_numpy().ravel())) return unique_markers