Source code for kiez.neighbors.approximate.nmslib

# SPDX-License-Identifier: BSD-3-Clause
# Author: Roman Feldbauer (version for scikit-hubness)
#         Daniel Obraczka (adaptation for kiez)
# PEP 563: Postponed Evaluation of Annotations

from __future__ import annotations

import numpy as np

from kiez.neighbors.neighbor_algorithm_base import NNAlgorithm

try:
    import nmslib
except ImportError:  # pragma: no cover
    nmslib = None

_VERBOSE_THRESH = 2


[docs]class NMSLIB(NNAlgorithm): """Wrapper for hierarchical navigable small world graphs based approximate nearest neighbor search implementation from NMSLIB. Parameters ---------- n_candidates: int, default = 5 number of nearest neighbors used in search metric: str, default = 'euclidean' distance measure used in search possible measures are found in :obj:`NMSLIB.valid_metrics` method: str, default = 'hnsw', ANN method to use. Currently, only 'hnsw' is supported. M: int, default = 16 maximum number of neighbors in zero or above layers of hierarchical graph post_processing: int, default = 2 More post processing means longer index creation, and higher retrieval accuracy. ef_construction: int, default = 200 higher value improves quality of constructed graph but leads to longer indexing times n_jobs: int, default = 1 Number of parallel jobs verbose: int, default = 0 Verbosity level. If verbose > 0, show tqdm progress bar on indexing and querying. Notes ----- See the nmslib documentation for more details: https://github.com/nmslib/nmslib/blob/master/manual/methods.md """ valid_metrics = ( "euclidean", "l2", "minkowski", "squared_euclidean", "sqeuclidean", "cosine", "cosinesimil", )
[docs] def __init__( self, n_candidates: int = 5, metric: str = "euclidean", method: str = "hnsw", M: int = 16, post_processing: int = 2, ef_construction: int = 200, n_jobs: int = 1, verbose: int = 0, ): self.space = None if nmslib is None: # pragma: no cover raise ImportError( "Please install the `nmslib` package, before using this class.\n" "$ pip install nmslib" ) if metric in [ "euclidean", "l2", "minkowski", "squared_euclidean", "sqeuclidean", ]: if metric in ["squared_euclidean", "sqeuclidean"]: metric = "sqeuclidean" else: metric = "euclidean" self.space = "l2" elif metric in ["cosine", "cosinesimil"]: self.space = "cosinesimil" elif metric not in self.__class__.valid_metrics: raise ValueError( f"Unknown metric please try one of {self.__class__.valid_metrics}" ) assert self.space in [ "l2", "cosinesimil", ], f"Internal: self.space={self.space} not allowed" super().__init__(n_candidates=n_candidates, metric=metric, n_jobs=n_jobs) self.verbose = verbose self.method = method self.M = M self.post_processing = post_processing self.ef_construction = ef_construction
def __repr__(self): return ( f"{self.__class__.__name__}(n_candidates={self.n_candidates}, " + f"verbose = {self.verbose}, " + f"method = {self.method}, " + f"M = {self.M}, " + f"space = {self.space}, " + f"post_processing = {self.post_processing}, " + f"ef_construction = {self.ef_construction}, " + f"n_jobs = {self.n_jobs}) " ) def _fit(self, data, is_source: bool): method = self.method post_processing = self.post_processing big_m = self.M ef_construction = self.ef_construction hnsw_index = nmslib.init(method=method, space=self.space) hnsw_index.addDataPointBatch(data) hnsw_index.createIndex( { "M": big_m, "efConstruction": ef_construction, "post": post_processing, "indexThreadQty": self.n_jobs, }, print_progress=(self.verbose >= _VERBOSE_THRESH), ) return hnsw_index def _kneighbors(self, query, k, index, return_distance, is_self_querying): # Fetch the neighbor candidates neigh_ind_dist = index.knnQueryBatch(query, k=k, num_threads=self.n_jobs) # If fewer candidates than required are found for a query, # we save index=-1 and distance=NaN n_query = query.shape[0] neigh_ind = -np.ones((n_query, k), dtype=np.int32) neigh_dist = np.empty_like(neigh_ind, dtype=query.dtype) * np.nan for i, (ind, dist) in enumerate(neigh_ind_dist): neigh_ind[i, : ind.size] = ind neigh_dist[i, : dist.size] = dist # Convert cosine similarities to cosine distances if self.space == "cosinesimil": neigh_dist *= -1 neigh_dist += 1 elif self.space == "l2": if self.metric == "sqeuclidean": neigh_dist **= 2 elif self.metric == "euclidean": neigh_dist = np.sqrt(neigh_dist) if return_distance: return neigh_dist, neigh_ind return neigh_ind