Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
bob.learn.em
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
bob
bob.learn.em
Merge requests
!46
Couldn't fetch the linked file.
Improvements to the new API and docs
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
Improvements to the new API and docs
api-fixes
into
master
Overview
1
Commits
2
Pipelines
3
Changes
32
Merged
Amir MOHAMMADI
requested to merge
api-fixes
into
master
3 years ago
Overview
1
Commits
2
Pipelines
3
Changes
1
Expand
Added pre-commit checks
Dropped old dependencies
Brought all classes and functions to bob.learn.em
Make KMeans.centroids_ available as KMeans.means
Implement a load method for GMMMachine
Fixes
#35 (closed)
0
0
Merge request reports
Compare
version 1
version 2
9d708ffe
3 years ago
version 1
5ba9babc
3 years ago
master (base)
and
version 2
latest version
d51aee8c
2 commits,
3 years ago
version 2
9d708ffe
1 commit,
3 years ago
version 1
5ba9babc
1 commit,
3 years ago
Show latest version
1 file
+
0
−
314
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
bob/learn/em/cluster/k_means.py deleted
100644 → 0
+
0
−
314
Options
#!/usr/bin/env python
# @author: Yannick Dayer <yannick.dayer@idiap.ch>
# @date: Tue 27 Jul 2021 11:04:10 UTC+02
import
logging
from
typing
import
Tuple
,
Union
import
dask.array
as
da
import
numpy
as
np
from
dask_ml.cluster.k_means
import
k_init
from
sklearn.base
import
BaseEstimator
logger
=
logging
.
getLogger
(
__name__
)
class
KMeansMachine
(
BaseEstimator
):
"""
Stores the k-means clusters parameters (centroid of each cluster).
Allows the clustering of data with the ``fit`` method.
The training works in two phases:
- An initialization (setting the initial values of the centroids)
- An e-m loop reducing the total distance between the data points and their
closest centroid.
The initialization can use an iterative process to find the best set of
coordinates, use random starting points, or take specified coordinates. The
``init_method`` parameter specifies which of these behavior is considered.
Attributes
----------
centroids_: ndarray of shape (n_clusters, n_features)
The current clusters centroids. Available after fitting.
"""
def
__init__
(
self
,
n_clusters
:
int
,
init_method
:
Union
[
str
,
np
.
ndarray
]
=
"
k-means||
"
,
convergence_threshold
:
float
=
1e-5
,
max_iter
:
int
=
20
,
random_state
:
Union
[
int
,
np
.
random
.
RandomState
]
=
0
,
init_max_iter
:
Union
[
int
,
None
]
=
None
,
)
->
None
:
"""
Parameters
----------
n_clusters: int
The number of represented clusters.
init_method:
One of:
"
random
"
,
"
k-means++
"
, or
"
k-means||
"
, or an array with the wanted
starting values of the centroids.
max_iter:
The maximum number of iterations for the e-m part.
random_state:
A seed or RandomState used for the initialization part.
init_max_iter:
The maximum number of iterations for the initialization part.
"""
if
n_clusters
<
1
:
raise
ValueError
(
"
The Number of cluster should be greater thant 0.
"
)
self
.
n_clusters
=
n_clusters
self
.
init_method
=
init_method
self
.
convergence_threshold
=
convergence_threshold
self
.
max_iter
=
max_iter
self
.
random_state
=
random_state
self
.
init_max_iter
=
init_max_iter
self
.
average_min_distance
=
np
.
inf
self
.
zeroeth_order_statistics
=
None
self
.
first_order_statistics
=
None
self
.
centroids_
=
None
@property
def
means
(
self
)
->
np
.
ndarray
:
"""
An alias for `centroids_`.
"""
return
self
.
centroids_
@means.setter
def
means
(
self
,
value
:
np
.
ndarray
):
self
.
centroids_
=
value
def
get_centroids_distance
(
self
,
x
:
np
.
ndarray
)
->
np
.
ndarray
:
"""
Returns the distance values between x and each cluster
'
s centroid.
The returned values are squared Euclidean distances.
Parameters
----------
x: ndarray of shape (n_features,) or (n_samples, n_features)
One data point, or a series of data points.
Returns
-------
distances: ndarray of shape (n_clusters,) or (n_clusters, n_samples)
For each cluster, the squared Euclidian distance (or distances) to x.
"""
return
np
.
sum
((
self
.
centroids_
[:,
None
]
-
x
[
None
,
:])
**
2
,
axis
=-
1
)
def
get_closest_centroid
(
self
,
x
:
np
.
ndarray
)
->
Tuple
[
int
,
float
]:
"""
Returns the closest mean
'
s index and squared Euclidian distance to x.
"""
dists
=
self
.
get_centroids_distance
(
x
)
min_id
=
np
.
argmin
(
dists
,
axis
=
0
)
min_dist
=
dists
[
min_id
]
return
min_id
,
min_dist
def
get_closest_centroid_index
(
self
,
x
:
np
.
ndarray
)
->
np
.
ndarray
:
"""
Returns the index of the closest cluster mean to x.
"""
return
np
.
argmin
(
self
.
get_centroids_distance
(
x
),
axis
=
0
)
def
get_min_distance
(
self
,
x
:
np
.
ndarray
)
->
np
.
ndarray
:
"""
Returns the smallest distance between that point and the clusters centroids.
For each point in x, the minimum distance to each cluster
'
s mean is returned.
The returned values are squared Euclidean distances.
"""
return
np
.
min
(
self
.
get_centroids_distance
(
x
),
axis
=
0
)
def
__eq__
(
self
,
obj
)
->
bool
:
return
self
.
is_similar_to
(
obj
,
r_epsilon
=
0
,
a_epsilon
=
0
)
def
is_similar_to
(
self
,
obj
,
r_epsilon
=
1e-05
,
a_epsilon
=
1e-08
)
->
bool
:
if
self
.
centroids_
is
not
None
and
obj
.
centroids_
is
not
None
:
return
np
.
allclose
(
self
.
centroids_
,
obj
.
centroids_
,
rtol
=
r_epsilon
,
atol
=
a_epsilon
)
else
:
logger
.
warning
(
"
KMeansMachine `centroids_` was not set. You should call
'
fit
'
first.
"
)
return
False
def
get_variances_and_weights_for_each_cluster
(
self
,
data
:
np
.
ndarray
):
"""
Returns the clusters variance and weight for data clustered by the machine.
For each cluster, finds the subset of the samples that is closest to that
centroid, and calculates:
1) the variance of that subset (the cluster variance)
2) the proportion of samples represented by that subset (the cluster weight)
Parameters
----------
data:
The data to compute the variance of.
Returns
-------
Tuple of arrays:
variances: ndarray of shape (n_clusters, n_features)
For each cluster, the variance in each dimension of the data.
weights: ndarray of shape (n_clusters, )
Weight (proportion of quantity of data point) of each cluster.
"""
n_cluster
=
self
.
n_clusters
closest_centroid_indices
=
self
.
get_closest_centroid_index
(
data
)
weights_count
=
np
.
bincount
(
closest_centroid_indices
,
minlength
=
n_cluster
)
weights
=
weights_count
/
weights_count
.
sum
()
# FIX for `too many indices for array` error if using `np.eye(n_cluster)` alone:
dask_compatible_eye
=
np
.
eye
(
n_cluster
)
*
np
.
array
(
1
,
like
=
data
)
# Accumulate
means_sum
=
np
.
sum
(
dask_compatible_eye
[
closest_centroid_indices
][:,
:,
None
]
*
data
[:,
None
],
axis
=
0
,
)
variances_sum
=
np
.
sum
(
dask_compatible_eye
[
closest_centroid_indices
][:,
:,
None
]
*
(
data
[:,
None
]
**
2
),
axis
=
0
,
)
# Reduce
means
=
means_sum
/
weights_count
[:,
None
]
variances
=
(
variances_sum
/
weights_count
[:,
None
])
-
(
means
**
2
)
return
variances
,
weights
def
initialize
(
self
,
data
:
np
.
ndarray
):
"""
Assigns the means to an initial value using a specified method or randomly.
"""
logger
.
debug
(
f
"
Initializing k-means means with
'
{
self
.
init_method
}
'
.
"
)
# k_init requires da.Array as input.
data
=
da
.
array
(
data
)
self
.
centroids_
=
k_init
(
X
=
data
,
n_clusters
=
self
.
n_clusters
,
init
=
self
.
init_method
,
random_state
=
self
.
random_state
,
max_iter
=
self
.
init_max_iter
,
)
def
e_step
(
self
,
data
:
np
.
ndarray
):
closest_k_indices
=
self
.
get_closest_centroid_index
(
data
)
# Number of data points in each cluster
self
.
zeroeth_order_statistics
=
np
.
bincount
(
closest_k_indices
,
minlength
=
self
.
n_clusters
)
# Sum of data points coordinates in each cluster
self
.
first_order_statistics
=
np
.
sum
(
np
.
eye
(
self
.
n_clusters
)[
closest_k_indices
][:,
:,
None
]
*
data
[:,
None
],
axis
=
0
,
)
self
.
average_min_distance
=
self
.
get_min_distance
(
data
).
mean
()
def
m_step
(
self
,
data
:
np
.
ndarray
):
self
.
centroids_
=
(
self
.
first_order_statistics
/
self
.
zeroeth_order_statistics
[:,
None
]
)
def
fit
(
self
,
X
,
y
=
None
):
"""
Fits this machine on data samples.
"""
logger
.
debug
(
"
Initializing trainer.
"
)
self
.
initialize
(
data
=
X
)
logger
.
info
(
"
Training k-means.
"
)
distance
=
np
.
inf
step
=
0
while
self
.
max_iter
is
None
or
step
<
self
.
max_iter
:
step
+=
1
logger
.
info
(
f
"
Iteration
{
step
:
3
d
}
"
+
(
f
"
/
{
self
.
max_iter
}
"
if
self
.
max_iter
else
""
)
)
distance_previous
=
distance
self
.
e_step
(
data
=
X
)
self
.
m_step
(
data
=
X
)
# If we're running in dask, persist the centroids so we don't recompute them
# from the start of the graph at every step.
for
attr
in
(
"
centroids_
"
,):
arr
=
getattr
(
self
,
attr
)
if
isinstance
(
arr
,
da
.
Array
):
setattr
(
self
,
attr
,
arr
.
persist
())
distance
=
float
(
self
.
average_min_distance
)
logger
.
info
(
f
"
Average minimal squared Euclidean distance =
{
distance
}
"
)
if
step
>
1
:
convergence_value
=
abs
(
(
distance_previous
-
distance
)
/
distance_previous
)
logger
.
info
(
f
"
Convergence value =
{
convergence_value
}
"
)
# Terminates if converged (and threshold is set)
if
(
self
.
convergence_threshold
is
not
None
and
convergence_value
<=
self
.
convergence_threshold
):
logger
.
info
(
"
Reached convergence threshold. Training stopped.
"
)
break
else
:
logger
.
info
(
"
Reached maximum step. Training stopped without convergence.
"
)
self
.
compute
()
return
self
def
partial_fit
(
self
,
X
,
y
=
None
):
"""
Applies one e-m step of k-means on the data.
"""
if
self
.
centroids_
is
None
:
logger
.
debug
(
"
First call to
'
partial_fit
'
. Initializing...
"
)
self
.
initialize
(
data
=
X
)
self
.
e_step
(
data
=
X
)
self
.
m_step
(
data
=
X
)
return
self
def
transform
(
self
,
X
):
"""
Returns all the distances between the data and each cluster
'
s mean.
Parameters
----------
X: ndarray of shape (n_samples, n_features)
Series of data points.
Returns
-------
distances: ndarray of shape (n_clusters, n_samples)
For each mean, for each point, the squared Euclidian distance between them.
"""
return
self
.
get_centroids_distance
(
X
)
def
predict
(
self
,
X
):
"""
Returns the labels of the closest cluster centroid to the data.
Parameters
----------
X: ndarray of shape (n_samples, n_features)
Series of data points.
Returns
-------
indices: ndarray of shape (n_samples)
The indices of the closest cluster for each data point.
"""
return
self
.
get_closest_centroid_index
(
X
)
def
compute
(
self
,
*
args
,
**
kwargs
):
"""
Computes delayed arrays if needed.
"""
for
name
in
(
"
centroids_
"
,):
setattr
(
self
,
name
,
np
.
asarray
(
getattr
(
self
,
name
)))
Loading