FileStreamSink — Streaming Sink for File-Based Data Sources

FileStreamSink is a concrete streaming sink that writes out the results of a streaming query to files (of the specified FileFormat) in the root path.

import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = in.
  writeStream.
  format("parquet").
  option("path", "parquet-output-dir").
  option("checkpointLocation", "checkpoint-dir").
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Append).
  start

FileStreamSink is created exclusively when DataSource is requested to create a streaming sink for a file-based data source (i.e. FileFormat).

Tip
Read up on FileFormat in The Internals of Spark SQL book.

FileStreamSink supports Append output mode only.

FileStreamSink uses spark.sql.streaming.fileSink.log.deletion (as isDeletingExpiredLog)

The textual representation of FileStreamSink is FileSink[path]

FileStreamSink uses _spark_metadata directory for…​FIXME

Tip

Enable ALL logging level for org.apache.spark.sql.execution.streaming.FileStreamSink to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.FileStreamSink=ALL

Refer to Logging.

Creating FileStreamSink Instance

FileStreamSink takes the following to be created:

  • SparkSession

  • Root directory

  • FileFormat

  • Names of the partition columns

  • Configuration options

FileStreamSink initializes the internal properties.

"Adding" Batch of Data to Sink — addBatch Method

addBatch(
  batchId: Long,
  data: DataFrame): Unit
Note
addBatch is a part of Sink Contract to "add" a batch of data to the sink.

addBatch…​FIXME

Creating BasicWriteJobStatsTracker — basicWriteJobStatsTracker Internal Method

basicWriteJobStatsTracker: BasicWriteJobStatsTracker

basicWriteJobStatsTracker simply creates a BasicWriteJobStatsTracker with the basic metrics:

  • number of written files

  • bytes of written output

  • number of output rows

  • number of dynamic partitions

Note
basicWriteJobStatsTracker is used exclusively when FileStreamSink is requested to addBatch.

hasMetadata Object Method

hasMetadata(
  path: Seq[String],
  hadoopConf: Configuration): Boolean

hasMetadata…​FIXME

Note

hasMetadata is used when:

  • DataSource (Spark SQL) is requested to resolve a FileFormat relation (resolveRelation) and creates a HadoopFsRelation

  • FileStreamSource is requested to fetchAllFiles

Internal Properties

Name Description

basePath

Base path (Hadoop’s Path for the given path)

Used when…​FIXME

logPath

Metadata log path (Hadoop’s Path for the base path and the _spark_metadata)

Used exclusively to create the FileStreamSinkLog

fileLog

Used exclusively when FileStreamSink is requested to addBatch

hadoopConf

Hadoop’s Configuration

Used when…​FIXME

results matching ""

    No results matching ""