#!/usr/bin/env python3
# SPDX-License-Identifier: BSD-3-Clause
# adapted from skhubness: https://github.com/VarIr/scikit-hubness/
"""Estimate hubness in datasets."""
from __future__ import annotations
import logging
import warnings
from typing import Optional, Union
import numpy as np
from scipy import stats
from tqdm.auto import tqdm
# Available hubness measures
VALID_HUBNESS_MEASURES = [
"all",
"all_but_gini",
"k_skewness",
"k_skewness_truncnorm",
"atkinson",
"gini",
"robinhood",
"antihubs",
"antihub_occurrence",
"hubs",
"hub_occurrence",
"groupie_ratio",
"k_occurrence",
]
_SPACE_LIMIT = 10000
def _calc_skewness_truncnorm(k_occurrence: np.ndarray) -> float:
"""Hubness measure; corrected for non-negativity of k-occurrence.
Hubness as skewness of truncated normal distribution
estimated from k-occurrence histogram.
Parameters
----------
k_occurrence: ndarray
Reverse nearest neighbor count for each object.
Returns
-------
skew_truncnorm
"""
clip_left = 0
clip_right = np.iinfo(np.int64).max
k_occurrence_mean = k_occurrence.mean()
k_occurrence_std = k_occurrence.std(ddof=1)
a = (clip_left - k_occurrence_mean) / k_occurrence_std
b = (clip_right - k_occurrence_mean) / k_occurrence_std
return stats.truncnorm(a, b).moment(3)
def _calc_gini_index(
k_occurrence: np.ndarray, limiting="memory", verbose: int = 0
) -> float:
"""Hubness measure; Gini index.
Parameters
----------
k_occurrence: ndarray
Reverse nearest neighbor count for each object.
limiting: 'memory' or 'cpu'
If 'cpu', use fast implementation with high memory usage,
if 'memory', use slightly slower, but memory-efficient implementation,
otherwise use naive implementation (slow, low memory usage)
verbose: int
control verbosity
Returns
-------
gini_index
"""
n = k_occurrence.size
if limiting in ["memory", "space"]:
numerator = 0
for i in tqdm(range(n), disable=not verbose, desc="Gini"):
numerator += np.sum(np.abs(k_occurrence[:] - k_occurrence[i]))
elif limiting in ["time", "cpu"]:
numerator = np.sum(
np.abs(k_occurrence.reshape(1, -1) - k_occurrence.reshape(-1, 1))
)
else: # slow naive implementation
n = k_occurrence.size
numerator = 0
for i in range(n):
for j in range(n):
numerator += np.abs(k_occurrence[i] - k_occurrence[j])
denominator = 2 * n * np.sum(k_occurrence)
return numerator / denominator
def _calc_robinhood_index(k_occurrence: np.ndarray) -> float:
"""Hubness measure; Robin hood/Hoover/Schutz index.
Parameters
----------
k_occurrence: ndarray
Reverse nearest neighbor count for each object.
Returns
-------
robinhood_index
Notes
-----
The Robin Hood index was proposed in [1]_ and is especially suited
for hubness estimation in large data sets. Additionally, it offers
straight-forward interpretability by answering the question:
What share of k-occurrence must be redistributed, so that all objects
are equally often nearest neighbors to others?
References
----------
.. [1] `Feldbauer, R.; Leodolter, M.; Plant, C. & Flexer, A.
Fast approximate hubness reduction for large high-dimensional data.
IEEE International Conference of Big Knowledge (2018).`
"""
numerator = 0.5 * float(np.sum(np.abs(k_occurrence - k_occurrence.mean())))
denominator = float(np.sum(k_occurrence))
return numerator / denominator
def _calc_atkinson_index(k_occurrence: np.ndarray, eps: float = 0.5) -> float:
"""Hubness measure; Atkinson index.
Parameters
----------
k_occurrence: ndarray
Reverse nearest neighbor count for each object.
eps: float
'Income' weight. Turns the index into a normative measure.
Returns
-------
atkinson_index
"""
if eps == 1:
term = np.prod(k_occurrence) ** (1.0 / k_occurrence.size)
else:
term = np.mean(k_occurrence ** (1 - eps)) ** (1 / (1 - eps))
return float(1.0 - 1.0 / k_occurrence.mean() * term)
def _calc_antihub_occurrence(k_occurrence: np.ndarray) -> tuple[np.ndarray, float]:
"""Proportion of antihubs in data set.
Antihubs are objects that are never among the nearest neighbors
of other objects.
Parameters
----------
k_occurrence: ndarray
Reverse nearest neighbor count for each object.
Returns
-------
antihubs, antihub_occurrence
"""
antihubs = np.argwhere(k_occurrence == 0).ravel()
antihub_occurrence = antihubs.size / k_occurrence.size
return antihubs, antihub_occurrence
def _calc_hub_occurrence(
k: int, k_occurrence: np.ndarray, n_test: int, hub_size: float = 2
) -> tuple[np.ndarray, float]:
"""Proportion of nearest neighbor slots occupied by hubs.
Parameters
----------
k: int
Specifies the number of nearest neighbors
k_occurrence: ndarray
Reverse nearest neighbor count for each object.
n_test: int
Number of queries (or objects in a test set)
hub_size: float
Factor to determine hubs
Returns
-------
hubs, hub_occurrence
"""
hubs = np.argwhere(k_occurrence >= hub_size * k).ravel()
hub_occurrence = k_occurrence[hubs].sum() / k / n_test
return hubs, hub_occurrence
[docs]def hubness_score(
nn_ind: np.ndarray,
target_samples: int,
*,
k: Optional[int] = None,
hub_size: float = 2.0,
verbose: int = 0,
return_value: str = "all_but_gini",
store_k_occurrence: bool = False,
) -> Union[float, dict]:
"""Calculate hubness scores from given neighbor indices.
Utilizes findings from [1]_ and [2]_.
Parameters
----------
nn_ind : np.ndarray
Neighbor index matrix
target_samples: int
number of entities in the target space
k : int
number of k for k-nearest neighbor
hub_size : float
Hubs are defined as objects with k-occurrence > hub_size * k.
verbose : int
Level of output messages
return_value : str
Hubness measure to return
By default, return all but gini,
because gini is slow on large datasets
Use "all" to return a dict of all available measures,
or check `kiez.analysis.VALID_HUBNESS_MEASURE`
for available measures.
store_k_occurrence: bool
Whether to save the k-occurrence. Requires O(n_test) memory.
Returns
-------
hubness_measure: float or dict
Return the hubness measure as indicated by `return_value`.
if return_value is 'all',
a dict of all hubness measures is returned.
Raises
------
ValueError
If nn_ind has wrong type
References
----------
.. [1] `Radovanović, M.; Nanopoulos, A. & Ivanovic, M.
Hubs in space: Popular nearest neighbors in high-dimensional data.
Journal of Machine Learning Research, 2010, 11, 2487-2531`
.. [2] `Feldbauer, R.; Leodolter, M.; Plant, C. & Flexer, A.
Fast approximate hubness reduction for large high-dimensional data.
IEEE International Conference of Big Knowledge (2018).`
Examples
--------
>>> from kiez import Kiez
>>> from kiez.analysis import hubness_score
>>> import numpy as np
>>> # create example data
>>> rng = np.random.RandomState(0)
>>> source = rng.rand(100,50)
>>> target = rng.rand(100,50)
>>> # fit and get neighbors
>>> k_inst = Kiez()
>>> k_inst.fit(source, target)
>>> nn_ind = k_inst.kneighbors(return_distance=False)
>>> # get hubness
>>> hub_score = hubness_score(nn_ind, target.shape[1])
>>> hub_score["robinhood"]
0.31
"""
n_train = nn_ind.shape[0]
n_test = target_samples
k_neighbors = nn_ind.copy()
if k is None:
k = nn_ind.shape[1]
elif k < k_neighbors.shape[1]:
k_neighbors = k_neighbors[:, :k]
elif k > k_neighbors.shape[1]:
k = nn_ind.shape[1]
warnings.warn(f"k > nn_ind.shape[1], k will be set to {k}", stacklevel=2)
assert k is not None
# Negative indices can occur, when ANN does not find enough neighbors,
# and must be removed
mask = k_neighbors < 0
if np.any(mask):
k_neighbors = k_neighbors[~mask]
del mask
try:
k_occurrence = np.bincount(k_neighbors.astype(int).ravel(), minlength=n_train)
except ValueError as e:
logging.info(f"k_occurence failed with the following neighbors: {k_neighbors}")
raise e
# traditional skewness measure
k_skewness = stats.skew(k_occurrence)
# new skewness measure (truncated normal distribution)
k_skewness_truncnorm = _calc_skewness_truncnorm(k_occurrence)
# Gini index
if return_value in ["gini", "all"]:
limiting = "space" if k_occurrence.shape[0] > _SPACE_LIMIT else "time"
gini_index = _calc_gini_index(k_occurrence, limiting, verbose=verbose)
else:
gini_index = np.nan
# Robin Hood index
robinhood_index = _calc_robinhood_index(k_occurrence)
# Atkinson index
atkinson_index = _calc_atkinson_index(k_occurrence)
# anti-hub occurrence
antihubs, antihub_occurrence = _calc_antihub_occurrence(k_occurrence)
# hub occurrence
hubs, hub_occurrence = _calc_hub_occurrence(
k=k,
k_occurrence=k_occurrence,
n_test=n_test,
hub_size=hub_size,
)
# Largest hub
groupie_ratio = k_occurrence.max() / n_test / k
# Dictionary of all hubness measures
hubness_measures = {
"k_skewness": k_skewness,
"k_skewness_truncnorm": k_skewness_truncnorm,
"atkinson": atkinson_index,
"gini": gini_index,
"robinhood": robinhood_index,
"antihubs": antihubs,
"antihub_occurrence": antihub_occurrence,
"hubs": hubs,
"hub_occurrence": hub_occurrence,
"groupie_ratio": groupie_ratio,
}
if store_k_occurrence:
hubness_measures["k_occurrence"] = k_occurrence
if return_value == "all":
return hubness_measures
if return_value == "all_but_gini":
del hubness_measures["gini"]
return hubness_measures
return hubness_measures[return_value]