Source code for spacecore.backend.numpy._ops

from __future__ import annotations

from typing import Any, Sequence, Tuple, Literal, Type, cast

from .._family import BackendFamily
from .._eager import EagerControlFlowMixin
from .._ops import BackendOps
from ...types import ArrayLike, DenseArray, SparseArray, DType, Index


[docs] class NumpyOps(EagerControlFlowMixin, BackendOps): """ BackendOps implementation for the NumPy/SciPy ecosystem. This backend uses NumPy for dense array operations and SciPy for sparse array operations. Dense arrays numpy.ndarray Sparse arrays scipy.sparse.spmatrix scipy.sparse.sparray Methods ------- Most methods mirror the corresponding NumPy or SciPy signatures and delegate directly to NumPy/SciPy implementations. Backend-specific behavior, dtype promotion, broadcasting, memory layout, and error modes therefore follow NumPy/SciPy semantics. Backend handles - np : module NumPy module stored on the class and available through instances as `ops.np`. Advanced users may use it when SpaceCore's portable API does not expose a required NumPy feature. - sp : module SciPy module stored on the class and available through instances as `ops.sp`. Advanced users may use it for SciPy-specific functionality. Notes ----- Code intended to remain backend-portable should prefer `BackendOps` methods. Direct use of `ops.np` or `ops.sp` is an explicit NumPy/SciPy-specific escape hatch. NumPy's `device` keyword is present for Array API interoperability. When supplied, it must be `"cpu"` or `None`; see the corresponding NumPy documentation for each method. """ import numpy as _np import scipy as _sp import array_api_compat.numpy as xp # Concrete library handles exposed as ``Any`` so the portable protocols # can flow into typed NumPy/SciPy calls without per-boundary casts; mirrors # the base ``xp: ClassVar[Any]`` design. np: Any = _np sp: Any = _sp _family = BackendFamily.numpy.value.lower() _allow_sparse = True def __init__(self) -> None: super().__init__()
[docs] def free_memory_bytes(self) -> int | None: """Return available system RAM in bytes. For a CPU backend the "device" is host memory, so this reports the available physical RAM via :mod:`psutil`, a required dependency. The kernel dispatcher (ADR-016) uses it as the memory budget that gates a materializing fast path. """ import psutil return int(psutil.virtual_memory().available)
@property def dense_array(self) -> Type[Any]: """ Dense array type using NumPy. Returns ------- Concrete dense array class accepted by this backend. See: https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html """ return self.np.ndarray @property def sparse_array(self) -> Tuple[Type[Any], ...]: """ Sparse array type tuple using SciPy. Returns ------- Concrete sparse array classes accepted by this backend, or None. See: https://docs.scipy.org/doc/scipy/reference/sparse.html """ sparse = self.sp.sparse types: list[type[Any]] = [] if hasattr(sparse, "spmatrix"): types.append(sparse.spmatrix) if hasattr(sparse, "sparray"): types.append(sparse.sparray) return tuple(types)
[docs] def sanitize_dtype(self, dtype: DType | None) -> DType: """ Normalize a dtype specifier using NumPy. Input: dtype: Optional dtype requested by SpaceCore or the caller. Output: Backend dtype object accepted by array constructors. See: https://numpy.org/doc/stable/reference/generated/numpy.dtype.html """ if dtype is None: return self.np.float64 return self.np.dtype(dtype)
[docs] def assparse( self, x: Any, *, format: Literal["csr", "csc", "coo"] = "csr", dtype: DType | None = None ) -> SparseArray: """ Convert input to a sparse array using SciPy. Input: x: Dense, sparse, or array-like input plus sparse-format options. Output: Sparse backend array. See: https://docs.scipy.org/doc/scipy/reference/sparse.html Backend-specific notes: SpaceCore currently converts dense inputs to 2-D SciPy sparse matrices in the requested format. """ sparse = self.sp.sparse self._reject_complex_to_real(x, dtype, operation="assparse") if self.is_sparse(x): if dtype is not None and self.get_dtype(x) != self.sanitize_dtype(dtype): x = x.astype(self.sanitize_dtype(dtype)) if format == "csr": return x.tocsr() if format == "csc": return x.tocsc() if format == "coo": return x.tocoo() raise ValueError(f"Unknown sparse format: {format!r}") x_arr = self.asarray(x, dtype=dtype) if x_arr.ndim != 2: raise ValueError("NumPy/SciPy sparse conversion currently expects a 2D array.") if format == "csr": return sparse.csr_matrix(x_arr) if format == "csc": return sparse.csc_matrix(x_arr) if format == "coo": return sparse.coo_matrix(x_arr) raise ValueError(f"Unknown sparse format: {format!r}")
[docs] def sparse_matmul(self, a: SparseArray, b: DenseArray) -> DenseArray: """ Multiply sparse and dense arrays using SciPy. Input: a: Sparse backend array; b: Dense backend array. Output: Dense backend array containing the product. See: https://docs.scipy.org/doc/scipy/reference/sparse.html Backend-specific notes: Uses SciPy sparse multiplication before returning a dense NumPy result when applicable. """ if not self.is_sparse(a): raise TypeError("sparse_matmul expects `a` to be a SciPy sparse matrix/array.") if not self.is_dense(b): raise TypeError("sparse_matmul expects `b` to be a Numpy dense object.") return a @ b
[docs] def logsumexp( self, a: DenseArray, axis: int | Sequence[int] | None = None, b: DenseArray | None = None, keepdims: bool = False, return_sign: bool = False, ) -> DenseArray | Tuple[DenseArray, DenseArray]: """ Compute a stable log-sum-exp reduction using SciPy. Input: a: Dense backend array; axis, weights, and sign options control the reduction. Output: Dense backend array or tuple containing log-sum-exp results. See: https://docs.scipy.org/doc/scipy/reference/generated/scipy.special.logsumexp.html """ return self.sp.special.logsumexp( a, axis=axis, b=b, keepdims=keepdims, return_sign=return_sign )
def _copy(self, x: DenseArray) -> DenseArray: """Return a NumPy copy of ``x`` (mutation primitive for index ops).""" return cast(Any, x).copy() def _scatter_add_inplace(self, y: DenseArray, index: Index, values: ArrayLike) -> None: """Accumulate ``values`` into ``y`` at ``index`` via ``numpy.add.at``.""" self.np.add.at(y, index, values)
[docs] def ix_(self, *args: Any) -> Any: r""" Build open mesh index arrays using NumPy. Input: args: One-dimensional index arrays or sequences. Output: Tuple of dense backend arrays usable for open-mesh indexing. See: https://numpy.org/doc/stable/reference/generated/numpy.ix\\_.html """ return self.np.ix_(*args)
[docs] def allclose_sparse( self, a: SparseArray, b: SparseArray, rtol: float = 1e-5, atol: float = 1e-8, ) -> bool: """ Compare sparse arrays elementwise within tolerances using SciPy. Input: a, b: Sparse backend arrays; rtol and atol configure comparison. Output: Boolean indicating whether sparse arrays are close. See: https://docs.scipy.org/doc/scipy/reference/sparse.html Backend-specific notes: Sparse inputs are converted to CSR and compared by logical sparse difference. """ self._require_two_sparse(a, b) a_csr = cast(Any, a).tocsr() b_csr = cast(Any, b).tocsr() if a_csr.shape != b_csr.shape: return False diff = (a_csr - b_csr).tocsr() if diff.nnz == 0: return True # NaN differences are never "close": ``abs(NaN) > tol`` is False, which # would otherwise mask a NaN-vs-finite (or NaN-vs-NaN) entry as equal and # wrongly report two different operators as close. This matches the # torch/cupy paths (dense ``allclose`` with ``equal_nan=False``). if self.np.isnan(diff.data).any(): return False a_abs = abs(a_csr).tocsr() b_abs = abs(b_csr).tocsr() scale = self.sp.sparse.csr_matrix.maximum(a_abs, b_abs) # tolerance_ij = atol + rtol * max(|a_ij|, |b_ij|) tol = scale.multiply(rtol) tol.data += atol bad = abs(diff) > tol return bad.nnz == 0