Source code for skdim._commonfuncs

#
# BSD 3-Clause License
#
# Copyright (c) 2020, Jonathan Bac
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
#    contributors may be used to endorse or promote products derived from
#    this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
import numpy as np
import numba as nb
import itertools
import numbers
import multiprocessing as mp
import warnings
from sklearn.neighbors import NearestNeighbors
from sklearn.utils.validation import check_array, check_is_fitted
from sklearn.base import BaseEstimator
from abc import abstractmethod


def indComb(NN):
    pt1 = np.tile(range(NN), NN)
    pt2 = np.repeat(range(NN), NN)

    un = pt1 > pt2

    pt1 = pt1[un]
    pt2 = pt2[un]

    return pt1, pt2, np.hstack((pt2[:, None], pt1[:, None]))


@nb.njit
def indnComb(NN, n):
    if n == 1:
        return np.arange(NN).reshape((-1, 1))
    prev = indnComb(NN, n - 1)
    lastind = prev[:, -1]
    ind_cf1 = np.repeat(lastind, NN)
    # ind_cf2 = np.tile(np.arange(NN), len(lastind))
    ind_cf2 = (
        np.repeat(np.arange(NN), len(lastind)).reshape(-1, len(lastind)).T.flatten()
    )  # np.tile replacement

    new_ind = np.where(ind_cf1 < ind_cf2)[0]
    new_ind1 = (new_ind - 1) // NN
    new_ind2 = new_ind % NN
    new_ind2[new_ind2 == 0] = NN
    return np.hstack((prev[new_ind1, :], np.arange(NN)[new_ind2].reshape((-1, 1))))


def efficient_indnComb(n, k, random_generator_):
    """
    memory-efficient indnComb:
    uniformly takes 5000 samples from itertools.combinations(n,k)
    """
    ncomb = binom_coeff(n, k)
    pop = itertools.combinations(range(n), k)
    targets = set(random_generator_.choice(ncomb, min(ncomb, 5000), replace=False))
    return np.array(
        list(itertools.compress(pop, map(targets.__contains__, itertools.count())))
    )


def lens(vectors):
    return np.sqrt(np.sum(vectors ** 2, axis=1))


def proxy(tup):
    function, X, Dict = tup
    return function(X, **Dict)


def get_nn(X, k, n_jobs=1):
    """Compute the k-nearest neighbors of a dataset np.array (n_samples x n_dims)"""
    neigh = NearestNeighbors(n_neighbors=k, n_jobs=n_jobs)
    neigh.fit(X)
    dists, inds = neigh.kneighbors(return_distance=True)
    return dists, inds


def asPointwise(data, class_instance, precomputed_knn=None, n_neighbors=100, n_jobs=1):
    """Use a global estimator as a pointwise one by creating kNN neighborhoods"""
    if precomputed_knn is not None:
        knn = precomputed_knn
    else:
        _, knn = get_nn(data, k=n_neighbors, n_jobs=n_jobs)

    if n_jobs > 1:
        with mp.Pool(n_jobs) as pool:
            # Asynchronously apply the `fit` function to each data point and collect the results
            results = [pool.apply_async(class_instance.fit, (X[i, :],)) for i in knn]
            # Retrieve the computed dimensions
        return np.array([r.get().dimension_ for r in results])
    else:
        return np.array([class_instance.fit(data[i, :]).dimension_ for i in knn])


# class DocInheritorBase(type):
#    """ A metaclass to append GlobalEstimator or LocalEstimator Attributes section docstring to each estimator"""
#
#    def __new__(mcs, class_name, class_bases, class_dict):
#        # inherit class docstring: the docstring is constructed by traversing
#        # the mro for the class and merging their docstrings, with each next
#        # docstring as serving as the 'parent', and the accumulated docstring
#        # serving as the 'child'
#        this_doc = class_dict.get("__doc__", None)
#        for mro_cls in (mro_cls for base in class_bases for mro_cls in base.mro()):
#            prnt_cls_doc = mro_cls.__doc__
#            if prnt_cls_doc is not None:
#                if prnt_cls_doc == "The most base type":
#                    prnt_cls_doc = None
#            this_doc = mcs.class_doc_inherit(prnt_cls_doc, this_doc)
#
#        class_dict["__doc__"] = this_doc
#
#        return type.__new__(mcs, class_name, class_bases, class_dict)
#
#    @staticmethod
#    def class_doc_inherit(prnt_doc, child_doc):
#        """ Merge the docstrings of a parent class and its child.
#
#        Parameters
#        ----------
#        prnt_cls_doc: Union[None, str]
#        child_doc: Union[None, str]
#        """
#        if prnt_doc is None or "dimension_" not in prnt_doc:
#            return child_doc
#        else:
#            if "Attributes" in child_doc:
#                prnt_doc_attr = prnt_doc.index("dimension_")
#                child_doc = child_doc + prnt_doc[prnt_doc_attr:] + "\n"
#            else:
#                prnt_doc_attr = prnt_doc.index("Attributes")
#                child_doc = child_doc + "\n    " + prnt_doc[prnt_doc_attr:]
#        return child_doc


class GlobalEstimator(BaseEstimator):  # , metaclass=DocInheritorBase):
    """ Template base class: inherit BaseEstimator, define transform, fit_transform, fit_pw, transform_pw, fit_transform_pw 
    
    Attributes
    ----------
    dimension_ : {int, float}
        The estimated intrinsic dimension 
    dimension_pw_ : np.array with dtype {int, float}
        Pointwise ID estimates
    dimension_pw_smooth_ : np.array with dtype float
        Smoothed pointwise ID estimates returned if self.fit_pw(smooth=True)
    """

    def _more_tags(self):
        return {
            "_skip_test": "check_methods_subset_invariance"
        }  # skip a test from sklearn.utils.estimator_checks because ID estimators are not subset invariant

    def transform(self, X=None):
        """ Predict dimension after a previous call to self.fit

        Parameters
        ----------
        X : Dummy parameter

        Returns
        -------
        dimension_ : {int, float}
            The estimated ID
        """
        check_is_fitted(self, "is_fitted_")
        return self.dimension_

    def fit_transform(self, X, y=None):
        """Fit estimator and return ID

        Parameters
        ----------
        X : {array-like}, shape (n_samples, n_features)
            The training input samples.

        Returns
        -------
        dimension_ : {int, float}
            The estimated intrinsic dimension
        """
        return self.fit(X).dimension_

    def fit_pw(self, X, precomputed_knn=None, smooth=False, n_neighbors=100, n_jobs=1):
        """Creates an array of pointwise ID estimates (self.dimension_pw_) by fitting the estimator in kNN of each point.

        Parameters
        ----------
        X: np.array (n_samples x n_neighbors)
            Dataset to fit
        precomputed_knn: np.array (n_samples x n_dims)
            An array of precomputed (sorted) nearest neighbor indices
        n_neighbors:
            Number of nearest neighbors to use (ignored when using precomputed_knn)
        n_jobs: int
            Number of processes
        smooth: bool, default = False
            Additionally computes a smoothed version of pointwise estimates by 
            taking the ID of a point as the average ID of each point in its neighborhood (self.dimension_pw_)
           smooth_ 

        Returns
        -------
        self : object
            Returns self
        """
        X = check_array(X, ensure_min_samples=n_neighbors + 1, ensure_min_features=2)

        if precomputed_knn is not None:
            knnidx = precomputed_knn
        else:
            _, knnidx = get_nn(X, k=n_neighbors, n_jobs=n_jobs)

        if n_jobs > 1:
            with mp.Pool(n_jobs) as pool:
                # Asynchronously apply the `fit` function to each data point and collect the results
                results = [pool.apply_async(self.fit, (X[i, :],)) for i in knnidx]
                # Retrieve the computed dimensions
                self.dimension_pw_ = np.array([r.get().dimension_ for r in results])
        else:
            self.dimension_pw_ = np.array(
                [self.fit(X[i, :]).dimension_ for i in knnidx]
            )

        if smooth:
            self.dimension_pw_smooth_ = np.zeros(len(knnidx))
            for i, point_nn in enumerate(knnidx):
                self.dimension_pw_smooth_[i] = np.mean(
                    np.append(self.dimension_pw_[i], self.dimension_pw_[point_nn])
                )
        return self

    def transform_pw(self, X=None):
        """ Return an array of pointwise ID estimates after a previous call to self.fit_pw

        Parameters
        ----------
        X : Dummy parameter

        Returns
        -------
        dimension_pw_ : np.array with dtype {int, float}
            Pointwise ID estimates
        dimension_pw_smooth_ : np.array with dtype float
            Smoothed pointwise ID estimates returned if self.fit_pw(smooth=True)
        """

        check_is_fitted(
            self,
            "dimension_pw_",
            msg=(
                "This class instance is not fitted yet. Call 'fit_pw' with "
                "appropriate arguments before using this method."
            ),
        )

        if hasattr(self, "dimension_pw_smooth_"):
            return self.dimension_pw_, self.dimension_pw_smooth_
        else:
            return self.dimension_pw_

    def fit_transform_pw(
        self, X, precomputed_knn=None, smooth=False, n_neighbors=100, n_jobs=1
    ):
        """Returns an array of pointwise ID estimates by fitting the estimator in kNN of each point.

        Parameters
        ----------
        X: np.array (n_samples x n_neighbors)
            Dataset to fit
        precomputed_knn: bool
            An array of precomputed (sorted) nearest neighbor indices
        n_neighbors:
            Number of nearest neighbors to use (ignored when using precomputed_knn)
        n_jobs: int
            Number of processes
        smooth: bool, default = False
            Additionally computes a smoothed version of pointwise estimates by 
            taking the ID of a point as the average ID of each point in its neighborhood (self.dimension_pw_)
           smooth_ 

        Returns
        -------
        dimension_pw_ : np.array with dtype {int, float}
            Pointwise ID estimates
        dimension_pw_smooth_ : np.array with dtype float
            Smoothed pointwise ID estimates returned if self.fit_pw(smooth=True)
        """

        X = check_array(X, ensure_min_samples=n_neighbors + 1, ensure_min_features=2)

        if precomputed_knn is not None:
            knnidx = precomputed_knn
        else:
            _, knnidx = get_nn(X, k=n_neighbors, n_jobs=n_jobs)

        if n_jobs > 1:
            with mp.Pool(n_jobs) as pool:
                # Asynchronously apply the `fit` function to each data point and collect the results
                results = [pool.apply_async(self.fit, (X[i, :],)) for i in knnidx]
                # Retrieve the computed dimensions
                dimension_pw_ = np.array([r.get().dimension_ for r in results])
        else:
            dimension_pw_ = np.array([self.fit(X[i, :]).dimension_ for i in knnidx])

        if smooth:
            dimension_pw_smooth_ = np.zeros(len(knnidx))
            for i, point_nn in enumerate(knnidx):
                dimension_pw_smooth_[i] = np.mean(
                    np.append(dimension_pw_[i], dimension_pw_[point_nn])
                )
            return dimension_pw_, dimension_pw_smooth_
        else:
            return dimension_pw_


class LocalEstimator(BaseEstimator):  # , metaclass=DocInheritorBase):
    """ Template base class: generic _fit, fit, transform_pw for local ID estimators 
    
    Attributes
    ----------
    dimension_ : {int, float}
        The estimated intrinsic dimension 
    dimension_pw_ : np.array with dtype {int, float}
        Pointwise ID estimates
    dimension_pw_smooth_ : np.array with dtype float
        Smoothed pointwise ID estimates returned if self.fit(smooth=True)
    """

    _N_NEIGHBORS: int = 100  # default neighborhood parameter

    def _more_tags(self):
        """Skips a test from sklearn.utils.estimator_checks because ID estimators are not subset invariant"""
        return {"_skip_test": "check_methods_subset_invariance"}

    @abstractmethod
    def _fit(self, X, dists=None, knnidx=None):
        """ Custom method to each local ID estimator, called in fit """
        self._my_ID_estimator_func(X, dists, knnidx)

    def fit(
        self,
        X,
        y=None,
        precomputed_knn_arrays=None,
        smooth=False,
        n_neighbors=None,
        comb="mean",
        n_jobs=1,
    ):
        """Fitting method for local ID estimators

        Parameters
        ----------
        X : {array-like}, shape (n_samples, n_features)
            The training input samples.
        y : dummy parameter to respect the sklearn API
        precomputed_knn_arrays: tuple[ np.array (n_samples x n_dims), np.array (n_samples x n_dims) ]
            Provide two precomputed arrays: (sorted nearest neighbor distances, sorted nearest neighbor indices)
        n_neighbors: int, default=self._N_NEIGHBORS
            Number of nearest neighbors to use (ignored when using precomputed_knn)
        n_jobs: int
            Number of processes
        smooth: bool, default = False
            Additionally computes a smoothed version of pointwise estimates by 
            taking the ID of a point as the average ID of each point in its neighborhood (self.dimension_pw_)
            smooth_ 

        Returns
        -------
        self : object
            Returns self.
        """
        # check inputs and define internal parameters
        if n_neighbors is None:
            n_neighbors = self._N_NEIGHBORS
        if n_neighbors >= len(X):
            warnings.warn("n_neighbors >= len(X), setting n_neighbors = len(X)-1")
            n_neighbors = len(X) - 1
        self.n_neighbors = n_neighbors
        self.comb = comb

        X = check_array(
            X, ensure_min_samples=self.n_neighbors + 1, ensure_min_features=2
        )

        if precomputed_knn_arrays is not None:
            dists, knnidx = precomputed_knn_arrays
        else:
            dists, knnidx = get_nn(X, k=self.n_neighbors, n_jobs=n_jobs)

        # fit
        self._fit(X=X, dists=dists, knnidx=knnidx)

        # combine local estimates
        if comb == "mean":
            self.dimension_ = np.mean(self.dimension_pw_)
        elif comb == "median":
            self.dimension_ = np.median(self.dimension_pw_)
        else:
            raise ValueError("Invalid comb parameter. It has to be 'mean' or 'median'")

        # compute smoothed local estimates
        if smooth:
            self.dimension_pw_smooth_ = np.zeros(len(knnidx))
            for i, point_nn in enumerate(knnidx):
                self.dimension_pw_smooth_[i] = np.mean(
                    np.append(self.dimension_pw_[i], self.dimension_pw_[point_nn])
                )
            self.is_fitted_pw_smooth_ = True
        self.is_fitted_pw_ = True
        self.is_fitted_ = True
        return self

    def transform(self, X=None):
        """ Predict ID after a previous call to self.fit

        Parameters
        ----------
        X : Dummy parameter

        Returns
        -------
        dimension_ : {int, float}
            The estimated ID
        """
        check_is_fitted(self, "is_fitted_")
        return self.dimension_

    def fit_transform(
        self,
        X,
        y=None,
        precomputed_knn_arrays=None,
        smooth=False,
        n_neighbors=None,
        comb="mean",
        n_jobs=1,
    ):
        """Fit-transform method for local ID estimators

        Parameters
        ----------
        X : {array-like}, shape (n_samples, n_features)
            The training input samples.
        y : dummy parameter to respect the sklearn API
        precomputed_knn_arrays: tuple[ np.array (n_samples x n_dims), np.array (n_samples x n_dims) ]
            Provide two precomputed arrays: (sorted nearest neighbor distances, sorted nearest neighbor indices)
        n_neighbors: int, default=self._N_NEIGHBORS
            Number of nearest neighbors to use (ignored when using precomputed_knn)
        n_jobs: int
            Number of processes
        smooth: bool, default = False
            Additionally computes a smoothed version of pointwise estimates by 
            taking the ID of a point as the average ID of each point in its neighborhood (self.dimension_pw_)
            smooth_ 

        Returns
        -------
        dimension_ : {int, float}
            The estimated intrinsic dimension
        """

        return self.fit(
            X,
            precomputed_knn_arrays=precomputed_knn_arrays,
            smooth=smooth,
            n_neighbors=n_neighbors,
            comb=comb,
            n_jobs=n_jobs,
        ).dimension_

    def transform_pw(self, X=None):
        """ Return an array of pointwise ID estimates after a previous call to self.fit_pw

        Parameters
        ----------
        X : Dummy parameter

        Returns
        -------
        dimension_pw : np.array 
            Pointwise ID estimates
        dimension_pw_smooth : np.array 
            If self.fit_pw(smooth=True), additionally returns smoothed pointwise ID estimates
        """

        check_is_fitted(
            self,
            "dimension_pw_",
            msg=(
                "This class instance is not fitted yet. Call 'fit_pw' with "
                "appropriate arguments before using this method."
            ),
        )

        if hasattr(self, "dimension_pw_smooth_"):
            return self.dimension_pw_, self.dimension_pw_smooth_
        else:
            return self.dimension_pw_

    def fit_transform_pw(
        self, X, precomputed_knn_arrays=None, smooth=False, n_neighbors=None, n_jobs=1
    ):
        """
        Returns an array of pointwise ID estimates by fitting the estimator in kNN of each point.

        Parameters
        ----------
        X: np.array (n_samples x n_neighbors)
            Dataset to fit
        precomputed_knn_arrays: tuple[ np.array (n_samples x n_dims), np.array (n_samples x n_dims) ]
            Provide two precomputed arrays: (sorted nearest neighbor distances, sorted nearest neighbor indices)
        n_neighbors: int, default=self._N_NEIGHBORS
            Number of nearest neighbors to use (ignored when using precomputed_knn).
        n_jobs: int
            Number of processes
        smooth: bool, default = False
            Additionally computes a smoothed version of pointwise estimates by 
            taking the ID of a point as the average ID of each point in its neighborhood (self.dimension_pw_)
           smooth_ 

        Returns
        -------
        dimension_pw : np.array 
            Pointwise ID estimates
        dimension_pw_smooth : np.array 
            If smooth is True, additionally returns smoothed pointwise ID estimates
        """

        self.fit(
            X,
            precomputed_knn_arrays=precomputed_knn_arrays,
            smooth=smooth,
            n_neighbors=n_neighbors,
            n_jobs=n_jobs,
        )

        if smooth:
            return self.dimension_pw_, self.dimension_pw_smooth_
        else:
            return self.dimension_pw_


# def mean_local_id(local_id, knnidx):
#    """
#    Compute the mean ID of all neighborhoods in which a point appears
#
#    Parameters
#    ----------
#    local_id : list or np.array
#        list of local ID for each point
#    knnidx : np.array
#        indices of kNN for each point returned by function get_nn
#
#    Results
#    -------
#    dimension_pw_smooth_ : np.array
#        list of mean local ID for each point
#
#    """
#    dimension_pw_smooth_ = np.zeros(len(local_id))
#    for point_i in range(len(local_id)):
#        # get all points which have this point in their neighbourhoods
#        all_neighborhoods_with_point_i = np.append(
#            np.where(knnidx == point_i)[0], point_i
#        )
#        # get the mean local ID of these points
#        dimension_pw_smooth_[point_i] = local_id[all_neighborhoods_with_point_i].mean()
#    return dimension_pw_smooth_


def binom_coeff(n, k):
    """
    Taken from : https://stackoverflow.com/questions/26560726/python-binomial-coefficient
    Compute the number of ways to choose $k$ elements out of a pile of $n.$

    Use an iterative approach with the multiplicative formula:
    $$\frac{n!}{k!(n - k)!} =
    \frac{n(n - 1)\dots(n - k + 1)}{k(k-1)\dots(1)} =
    \prod_{i = 1}^{k}\frac{n + 1 - i}{i}$$

    Also rely on the symmetry: $C_n^k = C_n^{n - k},$ so the product can
    be calculated up to $\min(k, n - k).$

    :param n: the size of the pile of elements
    :param k: the number of elements to take from the pile
    :return: the number of ways to choose k elements out of a pile of n
    """

    # When k out of sensible range, should probably throw an exception.
    # For compatibility with scipy.special.{comb, binom} returns 0 instead.
    if k < 0 or k > n:
        return 0

    if k == 0 or k == n:
        return 1

    total_ways = 1
    for i in range(min(k, n - k)):
        total_ways = total_ways * (n - i) // (i + 1)

    return total_ways


def check_random_generator(seed):
    """Turn seed into a numpy.random._generator.Generator' instance
    Parameters
    ----------
    seed : None | int | instance of RandomState
        If seed is None, return the RandomState singleton used by np.random.
        If seed is an int, return a new RandomState instance seeded with seed.
        If seed is already a RandomState instance, return it.
        Otherwise raise ValueError.
    """
    if seed is None or seed is np.random:
        return np.random.default_rng()
    if isinstance(seed, numbers.Integral):
        return np.random.default_rng(seed)
    if isinstance(seed, np.random._generator.Generator):
        return seed
    raise ValueError(
        "%r cannot be used to seed a numpy.random._generator.Generator"
        " instance" % seed
    )