# ruff: noqa: F722, F821
from abc import ABC, abstractmethod
import numpy as np
from jaxtyping import Float
from mini_gpr.kernels import Kernel
from mini_gpr.solvers import LinearSolver, vanilla
from mini_gpr.utils import ensure_1d, ensure_2d, get_rng
[docs]
class Model(ABC):
def __init__(
self,
kernel: Kernel,
noise: float = 1e-8,
solver: LinearSolver = vanilla,
):
self.kernel = kernel
self.noise = abs(noise)
self.solver = solver
[docs]
@abstractmethod
def fit(self, X: Float[np.ndarray, "N D"], y: Float[np.ndarray, "N"]):
"""
Fit the model to the data.
Parameters
----------
X
the data points.
y
the target values.
"""
[docs]
@abstractmethod
def predict(self, T: Float[np.ndarray, "T D"]) -> Float[np.ndarray, "T"]:
"""
Get the predictive mean of the function at the given locations.
Parameters
----------
T
the data points/locations at which to make predictions
"""
[docs]
@abstractmethod
def latent_uncertainty(
self, T: Float[np.ndarray, "T D"]
) -> Float[np.ndarray, "T"]:
"""
Get the latent uncertainty of the function at the given locations.
Parameters
----------
T
the data points/locations at which to get the latent uncertainty
"""
[docs]
def predictive_uncertainty(
self, T: Float[np.ndarray, "T D"]
) -> Float[np.ndarray, "T"]:
r"""
Get the predictive uncertainty of the function at the given locations.
Parameters
----------
T
the data points/locations at which to get the predictive uncertainty
"""
return (self.latent_uncertainty(T) ** 2 + self.noise**2) ** 0.5
@property
@abstractmethod
def log_likelihood(self) -> float:
"""
Get the log marginal likelihood of the model conditioned on the
training data.
"""
@abstractmethod
def with_new(self, kernel: Kernel, noise: float) -> "Model": ...
[docs]
@ensure_2d("locations")
def sample_prior(
self,
locations: Float[np.ndarray, "N D"],
n_samples: int = 1,
*,
rng: np.random.RandomState | int | None = None,
jitter: float = 1e-8,
) -> Float[np.ndarray, "n N"]:
"""
Generate samples from the model's prior distribution.
Parameters
----------
locations
the data points/locations at which to generate samples
n_samples
the number of samples to generate
rng
the random number generator to use
jitter
a small value to add to the diagonal of the kernel matrix to ensure
numerical stability
"""
N = locations.shape[0]
rng = get_rng(rng)
K = self.kernel(locations, locations) + np.eye(N) * jitter
L = np.linalg.cholesky(K)
Z = rng.randn(N, n_samples)
return L @ Z
[docs]
@abstractmethod
def sample_posterior(
self,
locations: Float[np.ndarray, "N D"],
n_samples: int = 1,
*,
rng: np.random.RandomState | int | None = None,
jitter: float = 1e-6,
) -> Float[np.ndarray, "n N"]:
"""
Generate samples from the model's posterior distribution.
Parameters
----------
locations
the data points/locations at which to generate samples
n_samples
the number of samples to generate
rng
the random number generator to use
jitter
a small value to add to the diagonal of the kernel matrix to ensure
numerical stability
"""
def __repr__(self):
name = self.__class__.__name__
return f"{name}(kernel={self.kernel}, noise={self.noise:.2e})"
[docs]
class GPR(Model):
"""
Full-rank Gaussian Process Regression model.
Parameters
----------
kernel: Kernel
defines the covariance between data points.
noise: float
the aleatoric noise assumed to be present in the data.
solver: LinearSolver
solver of linear systems of the form `A @ x = y`.
Example
-------
>>> from mini_gpr.kernels import RBF
>>> from mini_gpr.models import GPR
>>> model = GPR(kernel=RBF(), noise=1e-3)
>>> model.fit(X, y)
>>> predictions = model.predict(T) # (T,)
>>> uncertainties = model.predictive_uncertainty(T) # (T,)
"""
@ensure_1d("y")
@ensure_2d("X")
def fit(self, X: Float[np.ndarray, "N D"], y: Float[np.ndarray, "N"]):
self.X = X
self.K_XX = self.kernel(X, X) + self.noise**2 * np.eye(len(X))
self.c = self.solver(self.K_XX, y).flatten()
self.y = y
@ensure_2d("T")
def predict(self, T: Float[np.ndarray, "T D"]) -> Float[np.ndarray, "T"]:
K_XT = self.kernel(self.X, T) # (A, B)
return np.einsum("ab,a->b", K_XT, self.c) # (B)
@ensure_2d("T")
def latent_uncertainty(
self, T: Float[np.ndarray, "T D"]
) -> Float[np.ndarray, "T"]:
K_XT = self.kernel(self.X, T) # (A, B)
K_TT_diag = np.diag(self.kernel(T, T)) # (B,)
v = self.solver(self.K_XX, K_XT) # (A, B)
var = K_TT_diag - np.einsum("ab,ab->b", K_XT, v)
var = np.maximum(var, 0.0) # Numerical stability
return var**0.5
@property
def log_likelihood(self) -> float:
n = len(self.y)
# quadratic term: y^T (K+σ²I)^(-1) y
quad = np.dot(self.y, self.c)
# log determinant of covariance matrix
sign, logdet = np.linalg.slogdet(self.K_XX)
if sign <= 0:
raise np.linalg.LinAlgError(
"Kernel matrix is not positive definite. "
"Try gradually increasing the noise."
)
return (-0.5 * quad - 0.5 * logdet - 0.5 * n * np.log(2 * np.pi)).item()
def with_new(self, kernel: Kernel, noise: float) -> "GPR":
return GPR(kernel, noise, self.solver)
@ensure_2d("locations")
def sample_posterior(
self,
locations: Float[np.ndarray, "N D"],
n_samples: int = 1,
*,
rng: np.random.RandomState | int | None = None,
jitter: float = 1e-6,
) -> Float[np.ndarray, "n N"]:
rng = get_rng(rng)
N = locations.shape[0]
mu = self.predict(locations)
K_XT = self.kernel(self.X, locations)
v = self.solver(self.K_XX, K_XT)
K_TT = self.kernel(locations, locations)
cov = K_TT - K_XT.T @ v
# Force symmetry & numerical stability
cov = 0.5 * (cov + cov.T)
L = np.linalg.cholesky(cov + np.eye(N) * jitter)
Z = rng.randn(N, n_samples)
return (L @ Z) + mu[:, None]
[docs]
class SoR(Model):
"""
Subset of Regressors low-rank Gaussian Process Regression approximation.
Parameters
----------
kernel
defines the covariance between data points.
sparse_points
the inducing points.
noise
the aleatoric noise assumed to be present in the data.
solver
solver of linear systems of the form `A @ x = y`.
Example
-------
>>> from mini_gpr.kernels import RBF
>>> from mini_gpr.models import SoR
>>> model = SoR(
... kernel=RBF(), sparse_points=np.random.rand(10, 2), noise=1e-3
... )
>>> model.fit(X, y)
>>> predictions = model.predict(T) # (T,)
>>> uncertainties = model.latent_uncertainty(T) # (T,)
"""
def __init__(
self,
kernel: Kernel,
sparse_points: Float[np.ndarray, "M D"],
noise: float = 1e-8,
solver: LinearSolver = vanilla,
):
super().__init__(kernel, noise, solver)
self.M = sparse_points
def with_new(self, kernel: Kernel, noise: float) -> "SoR":
return self.__class__(
kernel,
sparse_points=self.M,
noise=noise,
solver=self.solver,
)
@ensure_2d("X")
def fit(self, X: Float[np.ndarray, "A D"], y: Float[np.ndarray, "A"]):
# compute kernel matrices
K_MX = self.kernel(self.M, X)
K_MM = self.kernel(self.M, self.M)
# store necessary components for prediction
self.y = y
self.K_MX = K_MX
self.inv_matrix = self.solver(
K_MX @ K_MX.T + self.noise**2 * K_MM,
np.eye(len(self.M)),
)
self.K_MM = K_MM
@ensure_2d("T")
def predict(self, T: Float[np.ndarray, "T D"]) -> Float[np.ndarray, "T"]:
K_TM = self.kernel(T, self.M) # (T, M)
temp = self.inv_matrix @ (self.K_MX @ self.y) # (M,)
return K_TM @ temp # (T,)
@ensure_2d("T")
def latent_uncertainty(
self, T: Float[np.ndarray, "T D"]
) -> Float[np.ndarray, "T"]:
# Compute required kernel matrices
K_TM = self.kernel(T, self.M)
K_TT_diag = np.diag(self.kernel(T, T))
var = K_TT_diag.copy()
K_MM_inv_K_MT = self.solver(self.K_MM, K_TM.T)
var -= np.einsum("tm,mt->t", K_TM, K_MM_inv_K_MT)
temp = K_TM @ self.inv_matrix @ K_TM.T
var += self.noise**2 * np.diag(temp)
# Ensure numerical stability
var = np.maximum(var, 0.0)
return var**0.5
@property
def log_likelihood(self) -> float:
n = len(self.y)
m = len(self.M)
sigma2 = float(self.noise**2)
if sigma2 <= 0.0:
raise ValueError(
"Noise variance must be positive for the likelihood."
)
# --- Quadratic term: y^T Σ^{-1} y ---
# t = K_MX y
t = self.K_MX @ self.y # (m,)
# u = (K_MX K_MX^T + σ² K_MM)^{-1} (K_MX y)
u = self.inv_matrix @ t # (m,)
# y^T Σ^{-1} y = (1/σ²) [ y^T y - t^T u ]
quad = (self.y @ self.y - t @ u) / sigma2
# --- log|Σ| via determinant lemma ---
# B = σ² K_MM + K_MX K_XM (same matrix inverted in 'inv_matrix')
B = self.K_MX @ self.K_MX.T + sigma2 * self.K_MM
sign_B, logdet_B = np.linalg.slogdet(B)
sign_K, logdet_K = np.linalg.slogdet(self.K_MM)
if sign_B <= 0 or sign_K <= 0:
raise np.linalg.LinAlgError(
"Kernel matrix is not positive definite. "
"Try gradually increasing the noise."
)
logdet_Sigma = (n - m) * np.log(sigma2) - logdet_K + logdet_B
return (-0.5 * (quad + logdet_Sigma + n * np.log(2 * np.pi))).item()
@ensure_2d("locations")
def sample_posterior(
self,
locations: Float[np.ndarray, "N D"],
n_samples: int = 1,
*,
rng: np.random.RandomState | int | None = None,
jitter: float = 1e-6,
) -> Float[np.ndarray, "N n"]:
rng = get_rng(rng)
# Posterior mean
mu = self.predict(locations) # (N,)
# --- Compute posterior covariance ---
K_TM = self.kernel(locations, self.M) # (N, M)
K_TT = self.kernel(locations, locations) # (N, N)
# First subtract Nyström term: K_TM K_MM^{-1} K_MT
K_MM_inv_K_MT = self.solver(self.K_MM, K_TM.T) # (M, N)
cov = K_TT - K_TM @ K_MM_inv_K_MT # (N, N)
# Add correction term from the noise-adjusted projection
# temp = K_TM @ inv_matrix @ K_TM.T
cov += self.noise**2 * (K_TM @ self.inv_matrix @ K_TM.T)
# Symmetrize for stability
cov = 0.5 * (cov + cov.T)
# --- Cholesky with jitter ---
L = np.linalg.cholesky(cov + np.eye(cov.shape[0]) * jitter)
# --- Draw samples ---
N = cov.shape[0]
Z = rng.randn(N, n_samples) # (N, n_samples)
return (L @ Z) + mu[:, None]