KMeans

KMeans class is an implementation of the K-means clustering algorithm in machine learning with support for k-means|| (aka k-means parallel) in Spark MLlib.

Roughly, k-means is an unsupervised iterative algorithm that groups input data in a predefined number of k clusters. Each cluster has a centroid which is a cluster center. It is a highly iterative machine learning algorithm that measures the distance (between a vector and centroids) as the nearest mean. The algorithm steps are repeated till the convergence of a specified number of steps.

Note
K-Means algorithm uses Lloyd’s algorithm in computer science.

It is an Estimator that produces a KMeansModel.

Tip
Do import org.apache.spark.ml.clustering.KMeans to work with KMeans algorithm.

KMeans defaults to use the following values:

  • Number of clusters or centroids (k): 2

  • Maximum number of iterations (maxIter): 20

  • Initialization algorithm (initMode): k-means||

  • Number of steps for the k-means|| (initSteps): 5

  • Convergence tolerance (tol): 1e-4

import org.apache.spark.ml.clustering._
val kmeans = new KMeans()

scala> println(kmeans.explainParams)
featuresCol: features column name (default: features)
initMode: initialization algorithm (default: k-means||)
initSteps: number of steps for k-means|| (default: 5)
k: number of clusters to create (default: 2)
maxIter: maximum number of iterations (>= 0) (default: 20)
predictionCol: prediction column name (default: prediction)
seed: random seed (default: -1689246527)
tol: the convergence tolerance for iterative algorithms (default: 1.0E-4)

KMeans assumes that featuresCol is of type VectorUDT and appends predictionCol of type IntegerType.

Internally, fit method "unwraps" the feature vector in featuresCol column in the input DataFrame and creates an RDD[Vector]. It then hands the call over to the MLlib variant of KMeans in org.apache.spark.mllib.clustering.KMeans. The result is copied to KMeansModel with a calculated KMeansSummary.

Each item (row) in a data set is described by a numeric vector of attributes called features. A single feature (a dimension of the vector) represents a word (token) with a value that is a metric that defines the importance of that word or term in the document.

Tip

Enable INFO logging level for org.apache.spark.mllib.clustering.KMeans logger to see what happens inside a KMeans.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.mllib.clustering.KMeans=INFO

Refer to Logging.

KMeans Example

You can represent a text corpus (document collection) using the vector space model. In this representation, the vectors have dimension that is the number of different words in the corpus. It is quite natural to have vectors with a lot of zero values as not all words will be in a document. We will use an optimized memory representation to avoid zero values using sparse vectors.

This example shows how to use k-means to classify emails as a spam or not.

// NOTE Don't copy and paste the final case class with the other lines
// It won't work with paste mode in spark-shell
final case class Email(id: Int, text: String)

val emails = Seq(
  "This is an email from your lovely wife. Your mom says...",
  "SPAM SPAM spam",
  "Hello, We'd like to offer you").zipWithIndex.map(_.swap).toDF("id", "text").as[Email]

// Prepare data for k-means
// Pass emails through a "pipeline" of transformers
import org.apache.spark.ml.feature._
val tok = new RegexTokenizer()
  .setInputCol("text")
  .setOutputCol("tokens")
  .setPattern("\\W+")

val hashTF = new HashingTF()
  .setInputCol("tokens")
  .setOutputCol("features")
  .setNumFeatures(20)

val preprocess = (tok.transform _).andThen(hashTF.transform)

val features = preprocess(emails.toDF)

scala> features.select('text, 'features).show(false)
+--------------------------------------------------------+------------------------------------------------------------+
|text                                                    |features                                                    |
+--------------------------------------------------------+------------------------------------------------------------+
|This is an email from your lovely wife. Your mom says...|(20,[0,3,6,8,10,11,17,19],[1.0,2.0,1.0,1.0,2.0,1.0,2.0,1.0])|
|SPAM SPAM spam                                          |(20,[13],[3.0])                                             |
|Hello, We'd like to offer you                           |(20,[0,2,7,10,11,19],[2.0,1.0,1.0,1.0,1.0,1.0])             |
+--------------------------------------------------------+------------------------------------------------------------+

import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans

scala> val kmModel = kmeans.fit(features.toDF)
16/04/08 15:57:37 WARN KMeans: The input data is not directly cached, which may hurt performance if its parent RDDs are also uncached.
16/04/08 15:57:37 INFO KMeans: Initialization with k-means|| took 0.219 seconds.
16/04/08 15:57:37 INFO KMeans: Run 0 finished in 1 iterations
16/04/08 15:57:37 INFO KMeans: Iterations took 0.030 seconds.
16/04/08 15:57:37 INFO KMeans: KMeans converged in 1 iterations.
16/04/08 15:57:37 INFO KMeans: The cost for the best run is 5.000000000000002.
16/04/08 15:57:37 WARN KMeans: The input data was not directly cached, which may hurt performance if its parent RDDs are also uncached.
kmModel: org.apache.spark.ml.clustering.KMeansModel = kmeans_7a13a617ce0b

scala> kmModel.clusterCenters.map(_.toSparse)
res36: Array[org.apache.spark.mllib.linalg.SparseVector] = Array((20,[13],[3.0]), (20,[0,2,3,6,7,8,10,11,17,19],[1.5,0.5,1.0,0.5,0.5,0.5,1.5,1.0,1.0,1.0]))

val email = Seq("hello mom").toDF("text")
val result = kmModel.transform(preprocess(email))

scala> .show(false)
+---------+------------+---------------------+----------+
|text     |tokens      |features             |prediction|
+---------+------------+---------------------+----------+
|hello mom|[hello, mom]|(20,[2,19],[1.0,1.0])|1         |
+---------+------------+---------------------+----------+

results matching ""

    No results matching ""