找到你要的答案

Q:how to convert directstream from kafka into data frames in spark 1.3.0

Q:如何将directstream从卡夫卡到火花1.3.0数据帧

After creating a direct stream like below:

val events = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)

I would like to convert the above stream into data frames, so that I could run hive queries over it. Could anyone please explain how this can be achieved? I am using spark version 1.3.0

创建后直接流如下:

val events = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)

我想把上面的数据流转换成数据帧,这样我就可以在上面运行蜂巢查询了。谁能解释一下这是怎么实现的?我用火花版本1.3.0

answer1: 回答1:

As explained in the Spark Streaming programming guide, try this:

import org.apache.spark.sql.SQLContext
object SQLContextSingleton {
  @transient private var instance: SQLContext = null

  // Instantiate SQLContext on demand
  def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}
case class Row(key: String, value: String)
eventss.foreachRDD { rdd =>
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  import sqlContext.implicits._
  val dataFrame = rdd.map {case (key, value) => Row(key, value)}.toDF()
  dataFrame.show()
}

如在火花流编程指南中解释,请尝试:

import org.apache.spark.sql.SQLContext
object SQLContextSingleton {
  @transient private var instance: SQLContext = null

  // Instantiate SQLContext on demand
  def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}
case class Row(key: String, value: String)
eventss.foreachRDD { rdd =>
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  import sqlContext.implicits._
  val dataFrame = rdd.map {case (key, value) => Row(key, value)}.toDF()
  dataFrame.show()
}
apache-spark  hive  streaming  apache-kafka