K-Means is an unsupervised learning algorithm. Given some data points, and a number of clusters k, k-means aims to partition the data points into k clusters, so as to minimize the within-cluster sum of squares (i.e. variance).
Implementation from scratch
import numpy as np
from numpy import ndarray
class KMeans:
def __init__(self, k: int, num_iter: int = 100, tol: float = 1e-5):
if k < 1:
raise ValueError("k must be greater than 0")
if num_iter < 1:
raise ValueError("num_iter must be greater than 0")
if tol < 0:
raise ValueError("tol must be non negative")
self.k = k
self.num_iter = num_iter
self.tol = tol
self.centroids = None
def _check_X(self, X: ndarray):
if len(X.shape) != 2:
raise ValueError("X must be of shape (n_samples, n_features)")
def fit(self, X: ndarray, seed: int = None):
self._check_X(X)
n_samples = X.shape[0]
if n_samples < self.k:
raise ValueError(
f"Number of samples ({n_samples}) must be greater "
f"than or equal to the number of clusters (k={self.k})."
)
# Init centorids
if seed is not None:
np.random.seed(seed)
self.centroids = X[np.random.choice(n_samples, self.k, replace=False)]
new_centroids, labels = None, None
for _ in range(self.num_iter):
labels = self._assign_clusters(X)
new_centroids = self._compute_centroids(X, labels)
if np.linalg.norm(
self.centroids - new_centroids
) < self.tol:
break
self.centroids = new_centroids
return self
def predict(self, X: ndarray) -> ndarray:
self._check_X(X)
if self.centroids is None:
raise RuntimeError("Must call fit() first")
return self._assign_clusters(X)
def _assign_clusters(self, X: ndarray) -> ndarray:
distances = np.linalg.norm(
X[:, np.newaxis] - self.centroids,
axis=2
)
labels = np.argmin(distances, axis=1)
return labels
def _compute_centroids(
self,
X: ndarray,
labels: ndarray
) -> ndarray:
return np.array([
X[labels == i].mean(axis=0)
for i in range(self.k)
])
Unit test:
def test_kmeans():
k = 1
X = np.random.rand(1, 1)
kmeans = KMeans(k)
output = kmeans.fit(X).predict(X)
expected_output = np.array([0])
np.testing.assert_equal(output, expected_output)
k = 2
X = np.random.rand(1, 1)
kmeans = KMeans(k)
with np.testing.assert_raises(ValueError):
kmeans.fit(X).predict(X)
k = 2
X = np.array([[1, 1], [2, 2], [101, 101], [102, 102]])
kmeans = KMeans(k)
output = kmeans.fit(X).predict(X)
assert output[0] == output[1]
assert output[2] == output[3]
assert output[0] != output[2]
assert len(set(output)) == k
test_kmeans()