K-Means with Spark

Objective

The objective of this hands on is to let you reason about the parallelization of the K-Means clustering algorithm using Spark.

For the sake of simplicity we have prepared a keystone docker container containing all the software and material required for the exercise. If you are interested in using it, install docker and execute the following command:

$ docker run -it dxlab/keystone

You will find the exercise material in the spark folder.

$ cd hands-on/spark

Getting started with Spark

Start by launching Spark’ python shell:

pyspark

K-means on Spark

We are going to use the machine learning module of Spark called MLlib designed to invoke machine learning algorithms on numerical data sets represented in RDD. By using RDD it is possible to interact with other components of Spark. When data are not numerical, MLlib requires additional data types like vectors generated using data transformation algorithms from text to numerical vectors (package pyspark.mlib).

MLlib implementation of k-means corresponds to the algorithm called K-Means\\5 which is a parallel version of the original one. The method header is defined as follows:

KMeans.train(k, maxIterations, initializationMode, runs)

  • K: number of desired clusters
  • maxIterations: the maximum number of iterations that the algorithm will perform. The more iterations the more precision in results but the execution time will increase.
  • initializationMode: specifies the type of initialization of the algorithm.
  • runs: number of times to execute the algorithm

Since K-means is not sure to find an optimum solution it can be executed many times on the same data set and the algorithm will return the best possible solution found in a given execution.

Creating an RDD

MLlib functions work on RDD so the first step is to create an RDD. We propose a simple file named example.txt with the following content:

0.0 0.0 0.0
0.1 0.1 0.1
0.2 0.2 0.2
9.0 9.0 9.0
9.1 9.1 9.1
9.2 9.2 9.2

Now load the data on Spark as an RDD object using the following command:

file = 'example.txt'
data = sc.textFile(file)

You can test the content of the object RDD using the following commands:

def show(x): 
    print(x)

data.foreach(show)

Convert text to numerical values

As said before, MLlib algorithms use numbers as parameters. In this example we convert the file example.txt to float:

from numpy import array

parsedData = data.map(
    lambda line: array([float(x) for x in line.split(' ')])
).cache() 

parsedData.foreach(show)

Training the algorithm

In order to create a model that can divide data into groups we need to import the package pyspark.mllib.clustering that contains the K-Means algorithm. Next we will create an instance of the object KMeans for grouping data into as many clusters as indicated by k.

from pyspark.mllib.clustering import KMeans
clusters = KMeans.train(parsedData, 2, maxIterations=10, runs=10, initializationMode='random')

Note that the variable clusters contains a reference to the class KmeansModel that enables the invocation of the method predict(newVal) for generating a new value for the parameter newVal and thereby obtain the group it corresponds to according to the generated model.

Evaluating the algorithm

Last step is to evaluate the obtained model and determine whether it is representative of the available data. Therefore, recall that the objective of the algorithm is to minimize the Euclidean distances among the points in every group. It is possible to consider the quadratic error generated by the algorithm known as Within Set Sum of Squared Error (WSSSE). It can be computed as follows:

from math import sqrt

def error(point): 
    center = clusters.centers[clusters.predict(point)] 
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = (parsedData.map(lambda point:error(point)).reduce(lambda x, y: x+y))
print('Within Set Sum of Squared Error = ' + str(WSSSE))

Further reading

  • More about streaming k-means in the Spark documentation.
  • Generate your own visualizations of streaming clustering and explore the range of settings and behaviours, checking out the code in the spark-ml-streaming package