TextSocketSource

TextSocketSource is a streaming source that reads lines from a socket at the host and port (defined by parameters).

It uses lines internal in-memory buffer to keep all of the lines that were read from a socket forever.

Caution

This source is not for production use due to design contraints, e.g. infinite in-memory collection of lines read and no fault recovery.

It is designed only for tutorials and debugging.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder.getOrCreate()

// Connect to localhost:9999
// You can use "nc -lk 9999" for demos
val textSocket = spark.
  readStream.
  format("socket").
  option("host", "localhost").
  option("port", 9999).
  load

import org.apache.spark.sql.Dataset
val lines: Dataset[String] = textSocket.as[String].map(_.toUpperCase)

val query = lines.writeStream.format("console").start

// Start typing the lines in nc session
// They will appear UPPERCASE in the terminal

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+
|    value|
+---------+
|UPPERCASE|
+---------+

scala> query.explain
== Physical Plan ==
*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#21]
+- *MapElements <function1>, obj#20: java.lang.String
   +- *DeserializeToObject value#43.toString, obj#19: java.lang.String
      +- LocalTableScan [value#43]

scala> query.stop

lines Internal Buffer

lines: ArrayBuffer[(String, Timestamp)]

lines is the internal buffer of all the lines TextSocketSource read from the socket.

Maximum Available Offset (getOffset method)

Note
getOffset is a part of the Streaming Source Contract.

TextSocketSource's offset can either be none or LongOffset of the number of lines in the internal lines buffer.

Schema (schema method)

TextSocketSource supports two schemas:

  1. A single value field of String type.

  2. value field of StringType type and timestamp field of TimestampType type of format yyyy-MM-dd HH:mm:ss.

Tip
Refer to sourceSchema for TextSocketSourceProvider.

Creating TextSocketSource Instance

TextSocketSource(
  host: String,
  port: Int,
  includeTimestamp: Boolean,
  sqlContext: SQLContext)

When TextSocketSource is created (see TextSocketSourceProvider), it gets 4 parameters passed in:

  1. host

  2. port

  3. includeTimestamp flag

  4. SQLContext

Caution
It appears that the source did not get "renewed" to use SparkSession instead.

It opens a socket at given host and port parameters and reads a buffering character-input stream using the default charset and the default-sized input buffer (of 8192 bytes) line by line.

Caution
FIXME Review Java’s Charset.defaultCharset()

It starts a readThread daemon thread (called TextSocketSource(host, port)) to read lines from the socket. The lines are added to the internal lines buffer.

Stopping TextSocketSource (stop method)

When stopped, TextSocketSource closes the socket connection.

results matching ""

    No results matching ""