K-Means is an unsupervised learning algorithm. Given some data points, and a number of clusters k, k-means aims to partition the data points into k clusters, so as to minimize the within-cluster sum of squares (i.e. variance).
Implementation from scratch
import numpy as np
from numpy import ndarray
class KMeans:
def __init__(
self, k: int, num_iter: int = 100, tol: float = 1e-5
):
if k < 1:
raise ValueError("k must be greater than 0")
if num_iter < 1:
raise ValueError("num_iter must be greater than 0")
if tol < 0:
raise ValueError("tol must be non negative")
self.k = k
self.num_iter = num_iter
self.tol = tol
self.centroids: ndarray | None = None
def _check_X(self, X: ndarray):
if len(X.shape) != 2:
raise ValueError(
"X must be of shape (n_samples, n_features)"
)
def fit(self, X: ndarray, seed: int | None = None):
self._check_X(X)
n_samples = X.shape[0]
if n_samples < self.k:
raise ValueError(
f"Number of samples ({n_samples}) must be "
"greater than or equal to the number of "
f"clusters (k={self.k})."
)
# Init centorids
if seed is not None:
np.random.seed(seed)
self.centroids = X[
np.random.choice(n_samples, self.k, replace=False)
]
new_centroids, labels = None, None
for _ in range(self.num_iter):
labels = self._assign_clusters(X)
new_centroids = self._compute_centroids(X, labels)
if (
np.linalg.norm(self.centroids - new_centroids)
< self.tol
):
break
self.centroids = new_centroids
return self
def predict(self, X: ndarray) -> ndarray:
self._check_X(X)
if self.centroids is None:
raise RuntimeError("Must call fit() first")
return self._assign_clusters(X)
def _assign_clusters(self, X: ndarray) -> ndarray:
distances = np.linalg.norm(
X[:, np.newaxis] - self.centroids, axis=2
)
labels = np.argmin(distances, axis=1)
return labels
def _compute_centroids(
self, X: ndarray, labels: ndarray
) -> ndarray:
return np.array(
[X[labels == i].mean(axis=0) for i in range(self.k)]
)
def test_kmeans():
k = 1
X = np.random.rand(1, 1)
kmeans = KMeans(k)
output = kmeans.fit(X).predict(X)
expected_output = np.array([0])
np.testing.assert_equal(output, expected_output)
k = 2
X = np.random.rand(1, 1)
kmeans = KMeans(k)
with np.testing.assert_raises(ValueError):
kmeans.fit(X).predict(X)
k = 2
X = np.array([[1, 1], [2, 2], [101, 101], [102, 102]])
kmeans = KMeans(k)
output = kmeans.fit(X).predict(X)
assert output[0] == output[1]
assert output[2] == output[3]
assert output[0] != output[2]
assert len(set(output)) == k
if __name__ == "__main__":
test_kmeans()
print("All tests passed.")