Source code for kiez.hubness_reduction.mutual_proximity
# SPDX-License-Identifier: BSD-3-Clause
# adapted from skhubness: https://github.com/VarIr/scikit-hubness/
from typing import Tuple, TypeVar
import numpy as np
from scipy import stats
from sklearn.utils.validation import check_is_fitted
from tqdm.auto import tqdm
from .base import HubnessReduction
T = TypeVar("T")
try:
import torch
from torch.distributions.normal import Normal
except ImportError:
torch = None
[docs]class MutualProximity(HubnessReduction):
"""Hubness reduction with Mutual Proximity.
Uses the formula presented in [1]_.
Parameters
----------
method: 'normal' or 'empiric', default = 'normal'
Model distance distribution with 'method'.
- 'normal' or 'gaussi' model distance distributions with independent Gaussians (fast)
- 'empiric' or 'exact' model distances with the empiric distributions (slow)
verbose: int, default = 0
If verbose > 0, show progress bar.
References
----------
.. [1] Schnitzer, D., Flexer, A., Schedl, M., & Widmer, G. (2012).
Local and global scaling reduce hubs in space. The Journal of Machine
Learning Research, 13(1), 2871-2902.
"""
[docs] def __init__(self, method: str = "normal", **kwargs):
super().__init__(**kwargs)
if method not in ["exact", "empiric", "normal", "gaussi"]:
raise ValueError(
f'Mutual proximity method "{method}" not recognized. Try "normal"'
' or "empiric".'
)
if method in ["exact", "empiric"]:
self.method = "empiric"
elif method in ["normal", "gaussi"]:
self.method = "normal"
def __repr__(self):
return (
f"{self.__class__.__name__}(method = {self.method}, verbose ="
f" {self.verbose})"
)
def _fit(
self,
neigh_dist,
neigh_ind,
source,
target,
) -> "MutualProximity":
"""Fit the model using neigh_dist and neigh_ind as training data.
Parameters
----------
neigh_dist: np.ndarray, shape (n_samples, n_neighbors)
Distance matrix of training objects (rows) against their
individual k nearest neighbors (columns).
neigh_ind: np.ndarray, shape (n_samples, n_neighbors)
Neighbor indices corresponding to the values in neigh_dist.
source
Ignored
target
Ignored
Returns
-------
MutualProximity
Raises
------
ValueError
If self.method is unknown
"""
self.n_train = neigh_dist.shape[0]
if self.method == "empiric":
self.neigh_dist_t_to_s_ = neigh_dist
self.neigh_ind_t_to_s_ = neigh_ind
elif self.method == "normal":
if self._use_torch:
self.mu_t_to_s_ = torch.nanmean(neigh_dist, axis=1)
self.sd_t_to_s_ = torch.std(neigh_dist, axis=1)
else:
self.mu_t_to_s_ = np.nanmean(neigh_dist, axis=1)
self.sd_t_to_s_ = np.nanstd(neigh_dist, axis=1)
return self
def _zeros(self, value):
if self._use_torch:
return torch.zeros(value)
return np.zeros(value)
def _empty_like(self, value):
if self._use_torch:
return torch.empty_like(value)
return np.empty_like(value)
def _sum(self, value, axis):
if self._use_torch:
return torch.sum(value, axis=axis)
return np.sum(value, axis=axis)
def _numel(self, value):
if self._use_torch:
return value.numel()
return value.size
[docs] def transform(self, neigh_dist, neigh_ind, query) -> Tuple[T, T]:
"""Transform distance between test and training data with Mutual Proximity.
Parameters
----------
neigh_dist: np.ndarray
Distance matrix of test objects (rows) against their individual
k nearest neighbors among the training data (columns).
neigh_ind: np.ndarray
Neighbor indices corresponding to the values in neigh_dist
query
Ignored
Returns
-------
hub_reduced_dist, neigh_ind
Mutual Proximity distances, and corresponding neighbor indices
Raises
------
ValueError
if self.method is unknown
Notes
-----
The returned distances are NOT sorted! If you use this class directly,
you will need to sort the returned matrices according to hub_reduced_dist.
"""
check_is_fitted(
self,
[
"mu_t_to_s_",
"sd_t_to_s_",
"neigh_dist_t_to_s_",
"neigh_ind_t_to_s_",
],
all_or_any=any,
)
# Calculate MP with independent Gaussians
if self.method == "normal":
mu_t_to_s = self.mu_t_to_s_
sd_t_to_s_ = self.sd_t_to_s_
if self._use_torch:
mu = torch.nanmean(neigh_dist, axis=1).reshape(-1, 1)
sd = torch.std(neigh_dist, axis=1).reshape(-1, 1)
p1 = 1 - Normal(mu, sd).cdf(neigh_dist)
p2 = 1 - Normal(mu_t_to_s[neigh_ind], sd_t_to_s_[neigh_ind]).cdf(
neigh_dist
)
else:
mu = np.nanmean(neigh_dist, axis=1).reshape(-1, 1)
sd = np.nanstd(neigh_dist, axis=1).reshape(-1, 1)
p1 = stats.norm.sf(neigh_dist, mu, sd)
p2 = stats.norm.sf(
neigh_dist, mu_t_to_s[neigh_ind], sd_t_to_s_[neigh_ind]
)
hub_reduced_dist = 1 - p1 * p2
# Calculate MP empiric (slow)
elif self.method == "empiric":
# if self._use_torch:
hub_reduced_dist = self._empty_like(neigh_dist)
n_test, n_indexed = neigh_dist.shape
# Show progress in hubness reduction loop
disable_tqdm = not self.verbose
range_n_test = tqdm(
range(n_test),
desc=f"MP ({self.method})",
disable=disable_tqdm,
)
max_ind = max(self.neigh_ind_t_to_s_.max(), neigh_ind.max())
for i in range_n_test:
d_i = neigh_dist[i, :][None, :] # broadcasted afterwards
d_j = self._zeros((self._numel(d_i), n_indexed))
for j in range(n_indexed):
tmp = self._zeros(max_ind + 1) + (
self.neigh_dist_t_to_s_[neigh_ind[i, j], -1] + 1e-6
)
tmp[
self.neigh_ind_t_to_s_[neigh_ind[i, j]]
] = self.neigh_dist_t_to_s_[neigh_ind[i, j]]
d_j[j, :] = tmp[neigh_ind[i]]
d = d_i.T
hub_reduced_dist[i, :] = 1.0 - (
self._sum((d_i > d) & (d_j > d), axis=1) / n_indexed
)
# Return the hubness reduced distances
# These must be sorted downstream
return hub_reduced_dist, neigh_ind
