from typing import List
import hdbscan
import numpy as np
import pandas as pd
import umap.umap_ as umap
from scipy.stats import t
from sklearn.cluster import KMeans # For Kmeans clustering
from sklearn.decomposition import PCA
[docs]def reduce_dimensions_pca(embeddings: List[List[float]],
dimensions: int = 80) -> List[List[float]]:
"""
Reduces the number of dimensions using PCA.
:param embeddings: list of lists size m*n where n is the number of dimensions.
:type embeddings: List[List[float]]
:param dimensions: number of principal components to keep.
:type dimensions: int
:return: list of lists size m*dimensions (reduced data)
:rtype: List[List[float]]
"""
pca = PCA(n_components=dimensions)
reduced = pca.fit_transform(embeddings)
return reduced
[docs]def reduce_dimensions_umap(embeddings: List[List[float]],
dimensions: int = 80,
n_neighbors: int = 10) -> List[List[float]]:
"""
Uses UMAP to reduce the dimensionality of the embeddings.
:param embeddings: list of lists size m*n where n is the number of dimensions.
:type embeddings: List[List[float]]
:param dimensions: number of components to keep.
:type dimensions: int
:return: list of lists size m*dimensions (reduced data).
:rtype: List[List[float]]
"""
reduced = umap.UMAP(
n_neighbors=n_neighbors,
min_dist=0.0,
n_components=dimensions,
random_state=42,
).fit_transform(embeddings)
return reduced
[docs]def shuffle(df: pd.DataFrame) -> pd.DataFrame:
"""
Shuffles the data by each column or row for a pandas dataframe.
:param df: pandas dataframe shaped m*n
:type df: pd.DataFrame
:return: pandas dataframe shuffled.
:rtype: pd.DataFrame
"""
return df.apply(lambda x: x.sample(frac=1).values)
[docs]def single_sample_t_test(sample: np.array,
population_stat: float = 0.0) -> float:
"""
Run a simple t test on a sample to see if it is significantly different
from the population mean.
:param sample: numpy array of floats.
:type sample: np.array
:param population_stat: float for the population mean.
:type population_stat: float
:return: float for the t statistic.
:rtype: float
"""
return (sample.mean() - population_stat) / \
(sample.std()/(len(sample)**0.5))
[docs]def calc_perm_variance(pca,
embeddings_df: pd.DataFrame,
n_simulations: int = 5) -> pd.DataFrame:
"""
Calculates the variance explained for a PCA of the permuted
data.
:param pca: sklearn pca object
:param embeddings_df: a pandas dataframe of the embedding vectors (m*n).
:type embeddings_df: pd.DataFrame
:param n_simulations: integer for the number of permutations to run it on.
:type n_simulations: int
:return: list of lists as a pandas dataframe with the variance explained.
:rtype: pd.DataFrame
"""
pca = PCA(svd_solver='full')
with_perm_var = []
simulations = n_simulations
for _ in range(simulations):
pca.fit(shuffle(embeddings_df))
with_perm_var.append(pca.explained_variance_ratio_)
return pd.DataFrame(with_perm_var).transpose()
[docs]def get_optimal_n_components(embeddings: List[List[float]],
n_simulations: int = 5) -> int:
"""
Calculates the optimal number of principal components to
keep in a dimension reduction situation. It calculates a
'noise' threshold by permuting the variables to remove any correlations.
Once that has been done, you calculate the variance explained by
the principal components of the permuted data.
I then run a t test to find which principal components are significantly
different from the baseline noise.
:param embeddings: list of lists (m*n) of floats. where m is the
number of vectors, and n the number of variables
in each vector.
:type embeddings: List[List[float]]
:param n_simulations: the number of permulations to get a distribution
of the noise.
:type n_simulations: int
:return: integer for the optimal number of principal components.
:rtype: int
"""
embeddings_df = pd.DataFrame(embeddings)
pca = PCA(svd_solver='full')
pca.fit(embeddings)
without_permutation_variance = pca.explained_variance_ratio_
with_perm_var_df = calc_perm_variance(pca, embeddings_df, n_simulations)
pvalues = []
for component in range(len(without_permutation_variance)):
degrees_freedom = n_simulations - 1 # n -1
t_stat = single_sample_t_test(with_perm_var_df.iloc[component],
without_permutation_variance[component])
p = t.cdf(t_stat, df=degrees_freedom)
pvalues.append(p)
if p > 0.05:
break
res = next(x for x, val in enumerate(pvalues) if val > 0.05)
return res
[docs]def kmeans_clustering(reduced: List[List[float]],
num_clusters: int = -1,
max_num_clusters: int = 75) -> List[int]:
"""
This function calculates clusters based on the reduced vectors.
I also calculates the best number of clusters using the elbow method.
max_num_clusters is the maximum number of clusters to
calculate to find the optimal number.
:param reduced: list of lists (m*n) of floats. where m is the number of vectors,
and n the number of variables in each vector.
:type reduced: List[List[float]]
:param max_num_clusters: integer for the upper bound number of clusters.
:type max_num_clusters: int
:return: list of cluster numbers for each element in reduced.
:rtype: List[int]
"""
vectors = reduced
if num_clusters > 1:
kmeans = KMeans(n_clusters=num_clusters,
init='k-means++',
random_state=42)
return kmeans.fit_predict(vectors)
else:
# Calculates the inertia (cluster dispersion)
# for each model with i number of clusters.
# Using k-means for the clustering.
# Intertia reference: https://stats.stackexchange.com
# /questions/78313/clustering-inertia-formula-in-scikit-learn
inertia = []
for i in range(1, max_num_clusters):
kmeans = KMeans(n_clusters=i, init='k-means++', random_state=42)
kmeans.fit(vectors)
inertia.append(kmeans.inertia_)
# Calculate the optimal number of clusters
n_clusters = 2
max_relative_strength = 0
for n in range(2, len(inertia)):
d_0 = inertia[n-2] - inertia[n-1]
d_1 = inertia[n-1] - inertia[n]
d2_1 = d_0 - d_1
s_0 = max(0, d2_1-d_1)
relative_s = s_0 / n
if relative_s > max_relative_strength:
n_clusters = n
max_relative_strength = relative_s
# Fit the model with the optimal number of clusters.
kmeans = KMeans(n_clusters=n_clusters,
init='k-means++',
n_init=1,
max_iter=200)
labels = kmeans.fit_predict(vectors)
return labels
[docs]def hdbscan_clustering(reduced: List[List[float]],
min_cluster_size: int = 4,
allow_single_cluster: bool = False) -> List[int]:
"""
Uses HDBSCAN to calculate clusters from the reduced data.
:param reduced: list of lists (m*n) of floats. where m is the number of vectors,
and n the number of variables in each vector.
:type reduced: List[List[float]]
:return: list of cluster numbers for each element in reduced.
(note that -1 is an outlier)
:rtype: List[int]
"""
clusterer = hdbscan.HDBSCAN(min_cluster_size=min_cluster_size,
allow_single_cluster=allow_single_cluster)
cluster_labels = clusterer.fit_predict(reduced)
return cluster_labels