FileStreamSource

FileStreamSource is a Source that reads text files from path directory as they appear. It uses LongOffset offsets.

Note
It is used by DataSource.createSource for FileFormat.

You can provide the schema of the data and dataFrameBuilder - the function to build a DataFrame in getBatch at instantiation time.

// NOTE The source directory must exist
// mkdir text-logs

val df = spark.readStream
  .format("text")
  .option("maxFilesPerTrigger", 1)
  .load("text-logs")

scala> df.printSchema
root
|-- value: string (nullable = true)

Batches are indexed.

It lives in org.apache.spark.sql.execution.streaming package.

import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", LongType, nullable = false) ::
  StructField("name", StringType, nullable = false) ::
  StructField("score", DoubleType, nullable = false) :: Nil)

// You should have input-json directory available
val in = spark.readStream
  .format("json")
  .schema(schema)
  .load("input-json")

val input = in.transform { ds =>
  println("transform executed")  // <-- it's going to be executed once only
  ds
}

scala> input.isStreaming
res9: Boolean = true

It tracks already-processed files in seenFiles hash map.

Tip

Enable DEBUG or TRACE logging level for org.apache.spark.sql.execution.streaming.FileStreamSource to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.FileStreamSource=TRACE

Refer to Logging.

Creating FileStreamSource Instance

Caution
FIXME

Options

maxFilesPerTrigger

maxFilesPerTrigger option specifies the maximum number of files per trigger (batch). It limits the file stream source to read the maxFilesPerTrigger number of files specified at a time and hence enables rate limiting.

It allows for a static set of files be used like a stream for testing as the file set is processed maxFilesPerTrigger number of files at a time.

schema

If the schema is specified at instantiation time (using optional dataSchema constructor parameter) it is returned.

Otherwise, fetchAllFiles internal method is called to list all the files in a directory.

When there is at least one file the schema is calculated using dataFrameBuilder constructor parameter function. Else, an IllegalArgumentException("No schema specified") is thrown unless it is for text provider (as providerName constructor parameter) where the default schema with a single value column of type StringType is assumed.

Note
text as the value of providerName constructor parameter denotes text file stream provider.

getOffset Method

getOffset: Option[Offset]
Note
getOffset is part of the Source Contract to find the latest offset.

getOffset…​FIXME

The maximum offset (getOffset) is calculated by fetching all the files in path excluding files that start with _ (underscore).

When computing the maximum offset using getOffset, you should see the following DEBUG message in the logs:

Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms

When computing the maximum offset using getOffset, it also filters out the files that were already seen (tracked in seenFiles internal registry).

You should see the following DEBUG message in the logs (depending on the status of a file):

new file: $file
// or
old file: $file

Generating DataFrame for Streaming Batch — getBatch Method

FileStreamSource.getBatch asks metadataLog for the batch.

You should see the following INFO and DEBUG messages in the logs:

INFO Processing ${files.length} files from ${startId + 1}:$endId
DEBUG Streaming ${files.mkString(", ")}

The method to create a result batch is given at instantiation time (as dataFrameBuilder constructor parameter).

metadataLog

metadataLog is a metadata storage using metadataPath path (which is a constructor parameter).

Note
It extends HDFSMetadataLog[Seq[String]].
Caution
FIXME Review HDFSMetadataLog

fetchMaxOffset Internal Method

fetchMaxOffset(): FileStreamSourceOffset

fetchMaxOffset…​FIXME

Note
fetchMaxOffset is used exclusively when FileStreamSource is requested to getOffset.

fetchAllFiles Internal Method

fetchAllFiles(): Seq[(String, Long)]

fetchAllFiles…​FIXME

Note
fetchAllFiles is used exclusively when FileStreamSource is requested to fetchMaxOffset.

allFilesUsingMetadataLogFileIndex Internal Method

allFilesUsingMetadataLogFileIndex(): Seq[FileStatus]

allFilesUsingMetadataLogFileIndex simply creates a new MetadataLogFileIndex and requests it to allFiles.

Note
allFilesUsingMetadataLogFileIndex is used exclusively when FileStreamSource is requested to fetchAllFiles (when requested for fetchMaxOffset when FileStreamSource is requested to getOffset).

results matching ""

    No results matching ""