ConsumerInterceptor

Example

package pl.jaceklaskowski.kafka

import java.util

import org.apache.kafka.clients.consumer.{ConsumerInterceptor, ConsumerRecords, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition

class KafkaInterceptor extends ConsumerInterceptor[String, String] {
  override def onConsume(records: ConsumerRecords[String, String]):
      ConsumerRecords[String, String] = {
    println(s"KafkaInterceptor.onConsume")
    import scala.collection.JavaConverters._
    records.asScala.foreach { r =>
      println(s"=> $r")
    }
    records
  }

  override def close(): Unit = {
    println("KafkaInterceptor.close")
  }

  override def onCommit(offsets: util.Map[TopicPartition, OffsetAndMetadata]): Unit = {
    println("KafkaInterceptor.onCommit")
    println(s"$offsets")
  }

  override def configure(configs: util.Map[String, _]): Unit = {
    println(s"KafkaInterceptor.configure($configs)")
  }
}

onConsume Method

Caution
FIXME

results matching ""

    No results matching ""