Source code for skdim.id._FisherS

#
# BSD 3-Clause License
#
# Copyright (c) 2020, Jonathan Bac
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
#    contributors may be used to endorse or promote products derived from
#    this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
from sklearn.utils.validation import check_array
import numba as nb
import numpy as np
import sklearn.decomposition as sk
from scipy.special import lambertw
from matplotlib import pyplot as plt
from .._commonfuncs import GlobalEstimator
import warnings

warnings.filterwarnings("ignore")


[docs]class FisherS(GlobalEstimator): """Intrinsic dimension estimation using the Fisher Separability algorithm. [Albergante2019]_ Parameters ---------- conditional_number: float, default=10 A positive real value used to select the top principal components. We consider only PCs with eigen values which are not less than the maximal eigenvalue divided by conditional_number project_on_sphere: bool, default=True A boolean value indicating if projecting on a sphere should be performed. test_alphas: 2D np.array with dtype float A row vector of floats, with alpha range, the values must be given increasing within (0,1) interval. Default is np.arange(.6,1,.02)[None]. produce_plots: bool, default=False A boolean value indicating if the standard plots need to be drawn. verbose: bool Whether to print the number of retained principal components limit_maxdim: bool Whether to cap estimated maxdim to the embedding dimension """ def __init__( self, conditional_number=10, project_on_sphere=1, alphas=None, produce_plots=False, verbose=0, limit_maxdim=False, ): self.conditional_number = conditional_number self.project_on_sphere = project_on_sphere self.alphas = alphas self.produce_plots = produce_plots self.verbose = verbose self.limit_maxdim = limit_maxdim
[docs] def fit(self, X, y=None): """ A reference implementation of a fitting function. Parameters ---------- X : {array-like}, shape (n_samples, n_features) The training input samples. y : dummy parameter to respect the sklearn API Returns ------- self : object Returns self. self.dimension_: float The estimated intrinsic dimension self.n_alpha : 1D np.array, float Effective dimension profile as a function of alpha self.n_single : float A single estimate for the effective dimension self.p_alpha : 2D np.array, float Distributions as a function of alpha, matrix with columns corresponding to the alpha values, and with rows corresponding to objects. self.separable_fraction : 1D np.array, float Separable fraction of data points as a function of alpha self.alphas : 2D np.array, float Input alpha values """ X = check_array(X, ensure_min_samples=2, ensure_min_features=2) # test_alphas introduced to pass sklearn checks (sklearn doesn't take arrays as default parameters) if self.alphas is None: self._alphas = np.arange(0.6, 1, 0.02)[None] else: self._alphas = self.alphas ( self.n_alpha_, self.dimension_, self.p_alpha_, self.alphas_, self.separable_fraction_, self.Xp_, ) = self._SeparabilityAnalysis(X) self.is_fitted_ = True # `fit` should always return `self` return self
@staticmethod @nb.njit def _histc(X, bins): map_to_bins = np.digitize(X, bins) r = np.zeros((len(X[0, :]), len(bins))) for j in range(len(map_to_bins[0, :])): for i in map_to_bins[:, j]: r[j, i - 1] += 1 return r def _preprocessing(self, X, center, dimred, whiten): """ Preprocessing of the dataset Inputs X is n-by-d data matrix with n d-dimensional datapoints. center is boolean. True means subtraction of mean vector. dimred is boolean. True means applying of dimensionality reduction with PCA. Number of used PCs is defined by conditional_number argument. whiten is boolean. True means applying of whitenning. True whiten automatically caused true dimred. project_on_sphere is boolean. True means projecting data onto unit sphere Outputs X is preprocessed data matrix. """ # centering sampleMean = np.mean(X, axis=0) if center: X = X - sampleMean # dimensionality reduction if requested dimensionality reduction or whitening if dimred or whiten: pca = sk.PCA() u = pca.fit_transform(X) v = pca.components_.T s = pca.explained_variance_ sc = s / s[0] ind = np.where(sc > 1 / self.conditional_number)[0] X = X @ v[:, ind] if self.verbose: print( "%i components are retained using conditional_number=%2.2f" % (len(ind), self.conditional_number) ) # whitening if whiten: X = u[:, ind] st = np.std(X, axis=0, ddof=1) X = X / st # #project on sphere (scale each vector to unit length) if self.project_on_sphere: st = np.sqrt(np.sum(X ** 2, axis=1)) st = np.array([st]).T X = X / st return X @staticmethod def _probability_inseparable_sphere(alphas, n): """ %probability_inseparable_sphere calculate theoretical probability for point %to be inseparable for dimension n % %Inputs: % alphas is 1-by-d vector of possible alphas. Must be row vector or scalar % n is c-by-1 vector of dimnesions. Must be column vector or scalar. % %Outputs: % p is c-by-d matrix of probabilities.""" p = np.power((1 - np.power(alphas, 2)), (n - 1) / 2) / ( alphas * np.sqrt(2 * np.pi * n) ) return p # def _checkSeparability(self, xy): # dxy = np.diag(xy) # sm = (xy/dxy).T # sm = sm - np.diag(np.diag(sm)) # sm = sm > self._alphas # py = sum(sm.T) # py = py/len(py[0, :]) # separ_fraction = sum(py == 0)/len(py[0, :]) # # return separ_fraction, py def _checkSeparabilityMultipleAlpha(self, data): """%checkSeparabilityMultipleAlpha calculate fraction of points inseparable %for each alpha and fraction of points which are inseparable from each %point for different alpha. % %Inputs: % data is data matrix to calculate separability. Each row contains one % data point. % alphas is array of alphas to test separability. % %Outputs: % separ_fraction fraction of points inseparable from at least one point. % Fraction is calculated for each alpha. % py is n-by-m matrix. py(i,j) is fraction of points which are % inseparable from point data(i, :) for alphas(j).""" # Number of points per 1 loop. 20k assumes approx 3.2GB nP = 2000 alphas = self._alphas # Normalize alphas if len(alphas[:, 0]) > 1: alphas = alphas.T addedone = 0 if max(self._alphas[0, :]) < 1: alphas = np.array([np.append(alphas, 1)]) addedone = 1 alphas = np.concatenate([[float("-inf")], alphas[0, :], [float("inf")]]) n = len(data) counts = np.zeros((n, len(alphas))) leng = np.zeros((n, 1)) for k in range(0, n, nP): # print('Chunk +{}'.format(k)) e = k + nP if e > n: e = n # Calculate diagonal part, divide each row by diagonal element xy = data[k:e, :] @ data[k:e, :].T leng[k:e] = np.diag(xy)[:, None] xy = xy - np.diag(leng[k:e].squeeze()) xy = xy / leng[k:e] counts[k:e, :] = counts[k:e, :] + self._histc(xy.T, alphas) # Calculate nondiagonal part for kk in range(0, n, nP): # Ignore diagonal part if k == kk: continue ee = kk + nP if ee > n: ee = n xy = data[k:e, :] @ data[kk:ee, :].T xy = xy / leng[k:e] counts[k:e, :] = counts[k:e, :] + self._histc(xy.T, alphas) # Calculate cumulative sum counts = np.cumsum(counts[:, ::-1], axis=1)[:, ::-1] # print(counts) py = counts / (n) py = py.T if addedone: py = py[1:-2, :] else: py = py[1:-1, :] separ_fraction = sum(py == 0) / len(py[0, :]) return separ_fraction, py def _dimension_uniform_sphere(self, py): """ %Gives an estimation of the dimension of uniformly sampled n-sphere %corresponding to the average probability of being inseparable and a margin %value % %Inputs: % py - average fraction of data points which are INseparable. % alphas - set of values (margins), must be in the range (0;1) % It is assumed that the length of py and alpha vectors must be of the % same. % %Outputs: % n - effective dimension profile as a function of alpha % n_single_estimate - a single estimate for the effective dimension % alfa_single_estimate is alpha for n_single_estimate. """ if len(py) != len(self._alphas[0, :]): raise ValueError( "length of py (%i) and alpha (%i) does not match" % (len(py), len(self._alphas[0, :])) ) if np.sum(self._alphas <= 0) > 0 or np.sum(self._alphas >= 1) > 0: raise ValueError( [ '"Alphas" must be a real vector, with alpha range, the values must be within (0,1) interval' ] ) # Calculate dimension for each alpha n = np.zeros((len(self._alphas[0, :]))) for i in range(len(self._alphas[0, :])): if py[i] == 0: # All points are separable. Nothing to do and not interesting n[i] = np.nan else: p = py[i] a2 = self._alphas[0, i] ** 2 w = np.log(1 - a2) n[i] = np.real(lambertw(-(w / (2 * np.pi * p * p * a2 * (1 - a2))))) / ( -w ) n[n == np.inf] = float("nan") # Find indices of alphas which are not completely separable inds = np.where(~np.isnan(n))[0] if len(inds) == 0: warnings.warn("All points are fully separable for any of the chosen alphas") return n, np.array([np.nan]), np.nan # Find the maximal value of such alpha alpha_max = max(self._alphas[0, inds]) # The reference alpha is the closest to 90 of maximal partially separable alpha alpha_ref = alpha_max * 0.9 k = np.where( abs(self._alphas[0, inds] - alpha_ref) == min(abs(self._alphas[0, :] - alpha_ref)) )[0] # Get corresponding values alfa_single_estimate = self._alphas[0, inds[k]] n_single_estimate = n[inds[k]] return n, n_single_estimate, alfa_single_estimate # def _dimension_uniform_sphere_robust(self, py): # '''modification to return selected index and handle the case where all values are 0''' # if len(py) != len(self._alphas[0, :]): # raise ValueError('length of py (%i) and alpha (%i) does not match' % ( # len(py), len(self._alphas[0, :]))) # # if np.sum(self._alphas <= 0) > 0 or np.sum(self._alphas >= 1) > 0: # raise ValueError( # ['"Alphas" must be a real vector, with alpha range, the values must be within (0,1) interval']) # # # Calculate dimension for each alpha # n = np.zeros((len(self._alphas[0, :]))) # for i in range(len(self._alphas[0, :])): # if py[i] == 0: # # All points are separable. Nothing to do and not interesting # n[i] = np.nan # else: # p = py[i] # a2 = self._alphas[0, i]**2 # w = np.log(1-a2) # n[i] = lambertw(-(w/(2*np.pi*p*p*a2*(1-a2))))/(-w) # # n[n == np.inf] = float('nan') # # Find indices of alphas which are not completely separable # inds = np.where(~np.isnan(n))[0] # if inds.size == 0: # n_single_estimate = np.nan # alfa_single_estimate = np.nan # return n, n_single_estimate, alfa_single_estimate # else: # # Find the maximal value of such alpha # alpha_max = max(self._alphas[0, inds]) # # The reference alpha is the closest to 90 of maximal partially separable alpha # alpha_ref = alpha_max*0.9 # k = np.where(abs(self._alphas[0, inds]-alpha_ref) # == min(abs(self._alphas[0, :]-alpha_ref)))[0] # # Get corresponding values # alfa_single_estimate = self._alphas[0, inds[k]] # n_single_estimate = n[inds[k]] # # return n, n_single_estimate, alfa_single_estimate, inds[k]
[docs] def point_inseparability_to_pointID( self, idx="all_inseparable", force_definite_dim=True, verbose=True ): """ Turn pointwise inseparability probability into pointwise global ID Inputs : args : same as SeparabilityAnalysis kwargs : idx : int, string int for custom alpha index 'all_inseparable' to choose alpha where lal points have non-zero inseparability probability 'selected' to keep global alpha selected force_definite_dim : bool whether to force fully separable points to take the minimum detectable inseparability value (1/(n-1)) (i.e., maximal detectable dimension) """ if idx == "all_inseparable": # all points are inseparable selected_idx = np.argwhere(np.all(self.p_alpha_ != 0, axis=1)).max() elif idx == "selected": # globally selected alpha selected_idx = (self.n_alpha_ == self.dimension_).tolist().index(True) elif type(idx) == int: selected_idx = idx else: raise ValueError("unknown idx parameter") # select palpha and corresponding alpha palpha_selected = self.p_alpha_[selected_idx, :] alpha_selected = self._alphas[0, selected_idx] py = palpha_selected.copy() _alphas = np.repeat(alpha_selected, len(palpha_selected))[None] if force_definite_dim: py[py == 0] = 1 / len(py) if len(py) != len(_alphas[0, :]): raise ValueError( "length of py (%i) and alpha (%i) does not match" % (len(py), len(_alphas[0, :])) ) if np.sum(_alphas <= 0) > 0 or np.sum(_alphas >= 1) > 0: raise ValueError( [ '"Alphas" must be a real vector, with alpha range, the values must be within (0,1) interval' ] ) # Calculate dimension for each alpha n = np.zeros((len(_alphas[0, :]))) for i in range(len(_alphas[0, :])): if py[i] == 0: # All points are separable. Nothing to do and not interesting n[i] = np.nan else: p = py[i] a2 = _alphas[0, i] ** 2 w = np.log(1 - a2) n[i] = np.real(lambertw(-(w / (2 * np.pi * p * p * a2 * (1 - a2))))) / ( -w ) n[n == np.inf] = float("nan") # Find indices of alphas which are not completely separable inds = np.where(~np.isnan(n))[0] if self.verbose: print( str(len(inds)) + "/" + str(len(py)), "points have nonzero inseparability probability for chosen alpha = " + str(round(alpha_selected, 2)) + f", force_definite_dim = {force_definite_dim}", ) return n, inds
[docs] def getSeparabilityGraph(self, idx="all_inseparable", top_edges=10000): data = self.Xp_ if idx == "all_inseparable": # all points are inseparable selected_idx = np.argwhere(np.all(self.p_alpha_ != 0, axis=1)).max() elif idx == "selected": # globally selected alpha selected_idx = (self.n_alpha_ == self.dimension_).tolist().index(True) elif type(idx) == int: selected_idx = idx else: raise ValueError("unknown idx parameter") alpha_selected = self._alphas[0, selected_idx] return self.buildSeparabilityGraph(data, alpha_selected, top_edges=top_edges)
[docs] @staticmethod def plotSeparabilityGraph(x, y, edges, alpha=0.3): for i in range(len(edges)): ii = edges[i][0] jj = edges[i][1] plt.plot([x[ii], x[jj]], [y[ii], y[jj]], "k-", alpha=alpha)
[docs] @staticmethod def buildSeparabilityGraph(data, alpha, top_edges=10000): """weighted directed separability graph, represented by a list of tuples (point i, point j) and an array of weights each tuple is the observation that point i is inseparable from j, the weight is <x_i,x_j>/<xi,xi>-alpha data is a matrix of data which is assumed to be properly normalized alpha parameter is a signle value in this case top_edges is the number of edges to return. if top_edges is negative then all edges will be returned """ # Number of points per 1 loop. 20k assumes approx 3.2GB nP = np.min([2000, len(data)]) n = len(data) leng = np.zeros((n, 1)) # globalxy = np.zeros((n,n)) insep_edges = [] weights = [] symmetric_graph = True symmetry_message = False for k in range(0, n, nP): e = k + nP if e > n: e = n # Calculate diagonal part, divide each row by diagonal element xy = data[k:e, :] @ data[k:e, :].T leng[k:e] = np.diag(xy)[:, None] xy = xy - np.diag(leng[k:e].squeeze()) xy = xy / leng[k:e] # if skdim.lid.FisherS.check_symmetric(xy): if np.allclose(xy, xy.T, rtol=1e-05, atol=1e-08): # globalxy[k:e,k:e] = np.triu(xy) for i in range(len(xy)): for j in range(i + 1, len(xy)): if xy[i, j] > alpha: insep_edges.append((k + i, k + j)) weights.append(xy[i, j] - alpha) else: symmetric_graph = False # globalxy[k:e,k:e] = xy for i in range(len(xy)): for j in range(i + 1, len(xy)): if xy[i, j] > alpha: insep_edges.append((k + i, k + j)) weights.append(xy[i, j] - alpha) # Calculate nondiagonal part startpoint = 0 if symmetric_graph: startpoint = k if not symmetry_message: print( "Graph is symmetric, only upper triangle of the separability matrix will be used" ) symmetry_message = True for kk in range(startpoint, n, nP): # Ignore diagonal part if not k == kk: ee = kk + nP if ee > n: ee = n xy = data[k:e, :] @ data[kk:ee, :].T xy = xy / leng[k:e] # globalxy[k:e,kk:ee] = xy for i in range(ee - kk): for j in range(ee - kk): if xy[i, j] > alpha: insep_edges.append((k + i, kk + j)) weights.append(xy[i, j] - alpha) weights = np.array(weights) if top_edges > 0: if top_edges < len(insep_edges): weights_sorted = np.sort(weights) weights_sorted = weights_sorted[::-1] thresh = weights_sorted[top_edges] insep_edges_filtered = [] weights_filtered = [] for i, w in enumerate(weights): if w > thresh: insep_edges_filtered.append(insep_edges[i]) weights_filtered.append(w) weights = np.array(weights_filtered) insep_edges = insep_edges_filtered return insep_edges, weights
[docs] @staticmethod def check_symmetric(a, rtol=1e-05, atol=1e-08): return np.allclose(a, a.T, rtol=rtol, atol=atol)
def _SeparabilityAnalysis(self, X): """ %Performs standard analysis of separability and produces standard plots. % %Inputs: % X - is a data matrix with one data point in each row. % Optional arguments in varargin form Name, Value pairs. Possible names: % 'conditional_number' - a positive real value used to select the top % princinpal components. We consider only PCs with eigen values % which are not less than the maximal eigenvalue divided by % conditional_number Default value is 10. % 'project_on_sphere' - a boolean value indicating if projecting on a % sphere should be performed. Default value is true. % 'Alphas' - a real vector, with alpha range, the values must be given increasing % within (0,1) interval. Default is [0.6,0.62,...,0.98]. % 'produce_plots' - a boolean value indicating if the standard plots % need to be drawn. Default is true. % 'verbose' - bool, whether to print number of retained principal components % 'limit_maxdim' bool, whether to cap estimated maxdim to the embedding dimension %Outputs: % n_alpha - effective dimension profile as a function of alpha % n_single - a single estimate for the effective dimension % p_alpha - distributions as a function of alpha, matrix with columns % corresponding to the alpha values, and with rows corresponding to % objects. % separable_fraction - separable fraction of data points as a function of % alpha % alphas - alpha values """ npoints = len(X[:, 0]) # Preprocess data Xp = self._preprocessing(X, 1, 1, 1) # Check separability separable_fraction, p_alpha = self._checkSeparabilityMultipleAlpha(Xp) # Calculate mean fraction of separable points for each alpha. py_mean = np.mean(p_alpha, axis=1) n_alpha, n_single, alpha_single = self._dimension_uniform_sphere(py_mean) alpha_ind_selected = np.where(n_single == n_alpha)[0] if self.limit_maxdim: n_single = np.clip(n_single, None, X.shape[1]) if self.produce_plots: # Define the minimal and maximal dimensions for theoretical graph with # two dimensions in each side n_min = np.floor(min(n_alpha)) - 2 n_max = np.floor(max(n_alpha) + 0.8) + 2 if n_min < 1: n_min = 1 ns = np.arange(n_min, n_max + 1) plt.figure() plt.plot(self._alphas[0, :], n_alpha, "ko-") plt.plot( self._alphas[0, alpha_ind_selected], n_single, "rx", markersize=16, ) plt.xlabel("\u03B1", fontsize=16) plt.ylabel("Effective dimension", fontsize=16) locs, labels = plt.xticks() plt.show() nbins = int(round(np.floor(npoints / 200))) if nbins < 20: nbins = 20 plt.figure() plt.hist(p_alpha[alpha_ind_selected, :][0], bins=nbins) plt.xlabel( "Inseparability prob. for \u03B1=%2.2f" % (self._alphas[0, alpha_ind_selected]), fontsize=16, ) plt.ylabel("Number of values") plt.show() plt.figure() plt.xticks(locs, labels) pteor = np.zeros((len(ns), len(self._alphas[0, :]))) for k in range(len(ns)): for j in range(len(self._alphas[0, :])): pteor[k, j] = self._probability_inseparable_sphere( self._alphas[0, j], ns[k] ) for i in range(len(pteor[:, 0])): plt.semilogy(self._alphas[0, :], pteor[i, :], "-", color="r") plt.xlim(min(self._alphas[0, :]), 1) if True in np.isnan(n_alpha): plt.semilogy( self._alphas[0, : np.where(np.isnan(n_alpha))[0][0]], py_mean[: np.where(np.isnan(n_alpha))[0][0]], "bo-", "LineWidth", 3, ) else: plt.semilogy(self._alphas[0, :], py_mean, "bo-", "LineWidth", 3) plt.xlabel("\u03B1") plt.ylabel("Mean inseparability prob.", fontsize=16) plt.title("Theor.curves for n=%i:%i" % (n_min, n_max)) plt.show() if len(n_single) > 1: print( "FisherS selected several dimensions as equally probable. Taking the maximum" ) return ( n_alpha, n_single.max(), p_alpha, self._alphas, separable_fraction, Xp, )