// NOTE The source directory must exist
// mkdir text-logs
val df = spark.readStream
.format("text")
.option("maxFilesPerTrigger", 1)
.load("text-logs")
scala> df.printSchema
root
|-- value: string (nullable = true)
FileStreamSource
FileStreamSource
is a Source that reads text files from path
directory as they appear. It uses LongOffset
offsets.
Note
|
It is used by DataSource.createSource for FileFormat .
|
You can provide the schema of the data and dataFrameBuilder
- the function to build a DataFrame
in getBatch at instantiation time.
Batches are indexed.
It lives in org.apache.spark.sql.execution.streaming
package.
import org.apache.spark.sql.types._
val schema = StructType(
StructField("id", LongType, nullable = false) ::
StructField("name", StringType, nullable = false) ::
StructField("score", DoubleType, nullable = false) :: Nil)
// You should have input-json directory available
val in = spark.readStream
.format("json")
.schema(schema)
.load("input-json")
val input = in.transform { ds =>
println("transform executed") // <-- it's going to be executed once only
ds
}
scala> input.isStreaming
res9: Boolean = true
It tracks already-processed files in seenFiles
hash map.
Tip
|
Enable Add the following line to
Refer to Logging. |
Options
maxFilesPerTrigger
maxFilesPerTrigger
option specifies the maximum number of files per trigger (batch). It limits the file stream source to read the maxFilesPerTrigger
number of files specified at a time and hence enables rate limiting.
It allows for a static set of files be used like a stream for testing as the file set is processed maxFilesPerTrigger
number of files at a time.
schema
If the schema is specified at instantiation time (using optional dataSchema
constructor parameter) it is returned.
Otherwise, fetchAllFiles
internal method is called to list all the files in a directory.
When there is at least one file the schema is calculated using dataFrameBuilder
constructor parameter function. Else, an IllegalArgumentException("No schema specified")
is thrown unless it is for text provider (as providerName
constructor parameter) where the default schema with a single value
column of type StringType
is assumed.
Note
|
text as the value of providerName constructor parameter denotes text file stream provider.
|
getOffset
Method
getOffset: Option[Offset]
Note
|
getOffset is part of the Source Contract to find the latest offset.
|
getOffset
…FIXME
The maximum offset (getOffset
) is calculated by fetching all the files in path
excluding files that start with _
(underscore).
When computing the maximum offset using getOffset
, you should see the following DEBUG message in the logs:
Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms
When computing the maximum offset using getOffset
, it also filters out the files that were already seen (tracked in seenFiles
internal registry).
You should see the following DEBUG message in the logs (depending on the status of a file):
new file: $file
// or
old file: $file
Generating DataFrame for Streaming Batch — getBatch
Method
FileStreamSource.getBatch
asks metadataLog for the batch.
You should see the following INFO and DEBUG messages in the logs:
INFO Processing ${files.length} files from ${startId + 1}:$endId
DEBUG Streaming ${files.mkString(", ")}
The method to create a result batch is given at instantiation time (as dataFrameBuilder
constructor parameter).
metadataLog
metadataLog
is a metadata storage using metadataPath
path (which is a constructor parameter).
Note
|
It extends HDFSMetadataLog[Seq[String]] .
|
Caution
|
FIXME Review HDFSMetadataLog
|
fetchMaxOffset
Internal Method
fetchMaxOffset(): FileStreamSourceOffset
fetchMaxOffset
…FIXME
Note
|
fetchMaxOffset is used exclusively when FileStreamSource is requested to getOffset.
|
fetchAllFiles
Internal Method
fetchAllFiles(): Seq[(String, Long)]
fetchAllFiles
…FIXME
Note
|
fetchAllFiles is used exclusively when FileStreamSource is requested to fetchMaxOffset.
|
allFilesUsingMetadataLogFileIndex
Internal Method
allFilesUsingMetadataLogFileIndex(): Seq[FileStatus]
allFilesUsingMetadataLogFileIndex
simply creates a new MetadataLogFileIndex and requests it to allFiles
.
Note
|
allFilesUsingMetadataLogFileIndex is used exclusively when FileStreamSource is requested to fetchAllFiles (when requested for fetchMaxOffset when FileStreamSource is requested to getOffset).
|