LinearRegression

LinearRegression is a Regressor that represents the linear regression algorithm in Machine Learning.

LinearRegression belongs to org.apache.spark.ml.regression package.

Tip
Read the scaladoc of LinearRegression.

It expects org.apache.spark.mllib.linalg.Vector as the input type of the column in a dataset and produces LinearRegressionModel.

import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression

The acceptable parameters:

scala> println(lr.explainParams)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)
featuresCol: features column name (default: features)
fitIntercept: whether to fit an intercept term (default: true)
labelCol: label column name (default: label)
maxIter: maximum number of iterations (>= 0) (default: 100)
predictionCol: prediction column name (default: prediction)
regParam: regularization parameter (>= 0) (default: 0.0)
solver: the solver algorithm for optimization. If this is not set or empty, default value is 'auto' (default: auto)
standardization: whether to standardize the training features before fitting the model (default: true)
tol: the convergence tolerance for iterative algorithms (default: 1.0E-6)
weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0 (default: )

LinearRegression Example

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val data = (0.0 to 9.0 by 1)                      // create a collection of Doubles
  .map(n => (n, n))                               // make it pairs
  .map { case (label, features) =>
    LabeledPoint(label, Vectors.dense(features)) } // create labeled points of dense vectors
  .toDF                                           // make it a DataFrame

scala> data.show
+-----+--------+
|label|features|
+-----+--------+
|  0.0|   [0.0]|
|  1.0|   [1.0]|
|  2.0|   [2.0]|
|  3.0|   [3.0]|
|  4.0|   [4.0]|
|  5.0|   [5.0]|
|  6.0|   [6.0]|
|  7.0|   [7.0]|
|  8.0|   [8.0]|
|  9.0|   [9.0]|
+-----+--------+

import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression

val model = lr.fit(data)

scala> model.intercept
res1: Double = 0.0

scala> model.coefficients
res2: org.apache.spark.mllib.linalg.Vector = [1.0]

// make predictions
scala> val predictions = model.transform(data)
predictions: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 1 more field]

scala> predictions.show
+-----+--------+----------+
|label|features|prediction|
+-----+--------+----------+
|  0.0|   [0.0]|       0.0|
|  1.0|   [1.0]|       1.0|
|  2.0|   [2.0]|       2.0|
|  3.0|   [3.0]|       3.0|
|  4.0|   [4.0]|       4.0|
|  5.0|   [5.0]|       5.0|
|  6.0|   [6.0]|       6.0|
|  7.0|   [7.0]|       7.0|
|  8.0|   [8.0]|       8.0|
|  9.0|   [9.0]|       9.0|
+-----+--------+----------+

import org.apache.spark.ml.evaluation.RegressionEvaluator

// rmse is the default metric
// We're explicit here for learning purposes
val regEval = new RegressionEvaluator().setMetricName("rmse")
val rmse = regEval.evaluate(predictions)

scala> println(s"Root Mean Squared Error: $rmse")
Root Mean Squared Error: 0.0

import org.apache.spark.mllib.linalg.DenseVector
// NOTE Follow along to learn spark.ml-way (not RDD-way)
predictions.rdd.map { r =>
  (r(0).asInstanceOf[Double], r(1).asInstanceOf[DenseVector](0).toDouble, r(2).asInstanceOf[Double]))
  .toDF("label", "feature0", "prediction").show
+-----+--------+----------+
|label|feature0|prediction|
+-----+--------+----------+
|  0.0|     0.0|       0.0|
|  1.0|     1.0|       1.0|
|  2.0|     2.0|       2.0|
|  3.0|     3.0|       3.0|
|  4.0|     4.0|       4.0|
|  5.0|     5.0|       5.0|
|  6.0|     6.0|       6.0|
|  7.0|     7.0|       7.0|
|  8.0|     8.0|       8.0|
|  9.0|     9.0|       9.0|
+-----+--------+----------+

// Let's make it nicer to the eyes using a Scala case class
scala> :pa
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.DenseVector
case class Prediction(label: Double, feature0: Double, prediction: Double)
object Prediction {
  def apply(r: Row) = new Prediction(
    label = r(0).asInstanceOf[Double],
    feature0 = r(1).asInstanceOf[DenseVector](0).toDouble,
    prediction = r(2).asInstanceOf[Double])
}

// Exiting paste mode, now interpreting.

import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.DenseVector
defined class Prediction
defined object Prediction

scala> predictions.rdd.map(Prediction.apply).toDF.show
+-----+--------+----------+
|label|feature0|prediction|
+-----+--------+----------+
|  0.0|     0.0|       0.0|
|  1.0|     1.0|       1.0|
|  2.0|     2.0|       2.0|
|  3.0|     3.0|       3.0|
|  4.0|     4.0|       4.0|
|  5.0|     5.0|       5.0|
|  6.0|     6.0|       6.0|
|  7.0|     7.0|       7.0|
|  8.0|     8.0|       8.0|
|  9.0|     9.0|       9.0|
+-----+--------+----------+

train Method

train(dataset: DataFrame): LinearRegressionModel

train (protected) method of LinearRegression expects a dataset DataFrame with two columns:

  1. label of type DoubleType.

  2. features of type Vector.

It returns LinearRegressionModel.

It first counts the number of elements in features column (usually features). The column has to be of mllib.linalg.Vector type (and can easily be prepared using HashingTF transformer).

val spam = Seq(
  (0, "Hi Jacek. Wanna more SPAM? Best!"),
  (1, "This is SPAM. This is SPAM")).toDF("id", "email")

import org.apache.spark.ml.feature.RegexTokenizer
val regexTok = new RegexTokenizer()
val spamTokens = regexTok.setInputCol("email").transform(spam)

scala> spamTokens.show(false)
+---+--------------------------------+---------------------------------------+
|id |email                           |regexTok_646b6bcc4548__output          |
+---+--------------------------------+---------------------------------------+
|0  |Hi Jacek. Wanna more SPAM? Best!|[hi, jacek., wanna, more, spam?, best!]|
|1  |This is SPAM. This is SPAM      |[this, is, spam., this, is, spam]      |
+---+--------------------------------+---------------------------------------+

import org.apache.spark.ml.feature.HashingTF
val hashTF = new HashingTF()
  .setInputCol(regexTok.getOutputCol)
  .setOutputCol("features")
  .setNumFeatures(5000)

val spamHashed = hashTF.transform(spamTokens)

scala> spamHashed.select("email", "features").show(false)
+--------------------------------+----------------------------------------------------------------+
|email                           |features                                                        |
+--------------------------------+----------------------------------------------------------------+
|Hi Jacek. Wanna more SPAM? Best!|(5000,[2525,2943,3093,3166,3329,3980],[1.0,1.0,1.0,1.0,1.0,1.0])|
|This is SPAM. This is SPAM      |(5000,[1713,3149,3370,4070],[1.0,1.0,2.0,2.0])                  |
+--------------------------------+----------------------------------------------------------------+

// Create labeled datasets for spam (1)
val spamLabeled = spamHashed.withColumn("label", lit(1d))

scala> spamLabeled.show
+---+--------------------+-----------------------------+--------------------+-----+
| id|               email|regexTok_646b6bcc4548__output|            features|label|
+---+--------------------+-----------------------------+--------------------+-----+
|  0|Hi Jacek. Wanna m...|         [hi, jacek., wann...|(5000,[2525,2943,...|  1.0|
|  1|This is SPAM. Thi...|         [this, is, spam.,...|(5000,[1713,3149,...|  1.0|
+---+--------------------+-----------------------------+--------------------+-----+

val regular = Seq(
  (2, "Hi Jacek. I hope this email finds you well. Spark up!"),
  (3, "Welcome to Apache Spark project")).toDF("id", "email")
val regularTokens = regexTok.setInputCol("email").transform(regular)
val regularHashed = hashTF.transform(regularTokens)
// Create labeled datasets for non-spam regular emails (0)
val regularLabeled = regularHashed.withColumn("label", lit(0d))

val training = regularLabeled.union(spamLabeled).cache

scala> training.show
+---+--------------------+-----------------------------+--------------------+-----+
| id|               email|regexTok_646b6bcc4548__output|            features|label|
+---+--------------------+-----------------------------+--------------------+-----+
|  2|Hi Jacek. I hope ...|         [hi, jacek., i, h...|(5000,[72,105,942...|  0.0|
|  3|Welcome to Apache...|         [welcome, to, apa...|(5000,[2894,3365,...|  0.0|
|  0|Hi Jacek. Wanna m...|         [hi, jacek., wann...|(5000,[2525,2943,...|  1.0|
|  1|This is SPAM. Thi...|         [this, is, spam.,...|(5000,[1713,3149,...|  1.0|
+---+--------------------+-----------------------------+--------------------+-----+

import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression

// the following calls train by the Predictor contract (see above)
val lrModel = lr.fit(training)

// Let's predict whether an email is a spam or not
val email = Seq("Hi Jacek. you doing well? Bye!").toDF("email")
val emailTokens = regexTok.setInputCol("email").transform(email)
val emailHashed = hashTF.transform(emailTokens)

scala> lrModel.transform(emailHashed).select("prediction").show
+-----------------+
|       prediction|
+-----------------+
|0.563603440350882|
+-----------------+

results matching ""

    No results matching ""