Deduplicate Unary Logical Operator

Deduplicate is a unary logical operator (i.e. LogicalPlan) that is created to represent dropDuplicates operator (that drops duplicate records for a given subset of columns).

Deduplicate has streaming flag enabled for streaming Datasets.

val uniqueRates = spark.
  readStream.
  format("rate").
  load.
  dropDuplicates("value")  // <-- creates Deduplicate logical operator
// Note the streaming flag
scala> println(uniqueRates.queryExecution.logical.numberedTreeString)
00 Deduplicate [value#33L], true  // <-- streaming flag enabled
01 +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@4785f176,rate,List(),None,List(),None,Map(),None), rate, [timestamp#32, value#33L]
Caution
FIXME Example with duplicates across batches to show that Deduplicate keeps state and withWatermark operator should also be used to limit how much is stored (to not cause OOM)
Note

UnsupportedOperationChecker ensures that dropDuplicates operator is not used after aggregation on streaming Datasets.

The following code is not supported in Structured Streaming and results in an AnalysisException.

val counts = spark.
  readStream.
  format("rate").
  load.
  groupBy(window($"timestamp", "5 seconds") as "group").
  agg(count("value") as "value_count").
  dropDuplicates  // <-- after groupBy

import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = counts.
  writeStream.
  format("console").
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Complete).
  start
org.apache.spark.sql.AnalysisException: dropDuplicates is not supported after aggregation on a streaming DataFrame/Dataset;;
Note

Deduplicate logical operator is translated (aka planned) to:

  • StreamingDeduplicateExec physical operator in StreamingDeduplicationStrategy execution planning strategy for streaming Datasets (aka streaming plans)

  • Aggregate physical operator in ReplaceDeduplicateWithAggregate execution planning strategy for non-streaming/batch Datasets (aka batch plans)

The output schema of Deduplicate is exactly the child's output schema.

Creating Deduplicate Instance

Deduplicate takes the following when created:

  • Attributes for keys

  • Child logical operator (i.e. LogicalPlan)

  • Flag whether the logical operator is for streaming (enabled) or batch (disabled) mode

results matching ""

    No results matching ""