inappropriate output while creating a dataframe

2019-07-15 15:07发布

I'm trying to stream the data from kafka topic using scala application.I'm able to get the data from the topic, but how to create a data frame out of it?

Here is the data(in string,string format)

  "action": "AppEvent",
  "tenantid": 298,
  "lat": 0.0,
  "lon": 0.0,
  "memberid": 16390,
  "event_name": "CATEGORY_CLICK",
  "productUpccd": 0,
  "device_type": "iPhone",
  "device_os_ver": "10.1",
  "item_name": "CHICKEN"

I tried few ways to do it, but it is not yielding satisfactory results.

 +--------------------+ |                  _1|
 +--------------------+ |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...|
 |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...|
 |{"action":"AppEve...| |{"action":"AppEve...|

Can anyone tell How to do the mapping so that each field goes in to a seperate column like a table. the data is in avro format.

here is the code which is getting the data from the topic.

val ssc = new StreamingContext(sc, Seconds(2))
val kafkaConf = Map[String, String]("" -> "####",
     "zookeeper.connect" -> "########",
     "" -> "KafkaConsumer",
     "" -> "1000000")
val topicMaps = Map("fishbowl" -> 1)
val messages  = KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER).map(_._2)

please guide me how to use foreachRDD func and map() to create a proper data frame

2楼-- · 2019-07-15 15:27

To create a dataframe out of an rdd irrespective of its case class schema. Use this below logic

  rdd => {
     val dataFrame =

Here stream is an rdd created from kafkaUtils.createStream()

登录 后发表回答