package pl.jaceklaskowski.kafka
import java.util
import org.apache.kafka.clients.consumer.{ConsumerInterceptor, ConsumerRecords, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
class KafkaInterceptor extends ConsumerInterceptor[String, String] {
override def onConsume(records: ConsumerRecords[String, String]):
ConsumerRecords[String, String] = {
println(s"KafkaInterceptor.onConsume")
import scala.collection.JavaConverters._
records.asScala.foreach { r =>
println(s"=> $r")
}
records
}
override def close(): Unit = {
println("KafkaInterceptor.close")
}
override def onCommit(offsets: util.Map[TopicPartition, OffsetAndMetadata]): Unit = {
println("KafkaInterceptor.onCommit")
println(s"$offsets")
}
override def configure(configs: util.Map[String, _]): Unit = {
println(s"KafkaInterceptor.configure($configs)")
}
}