I am trying to do some analysis on sets. I have a sample data set that looks like this:
orders.json
{"items":[1,2,3,4,5]}
{"items":[1,2,5]}
{"items":[1,3,5]}
{"items":[3,4,5]}
All it is, is a single field that is a list of numbers that represent IDs.
Here is the Spark script I am trying to run:
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("Dataframe Test")
val sc = new SparkContext(sparkConf)
val sql = new SQLContext(sc)
val dataframe = sql.read.json("orders.json")
val expanded = dataframe
.explode[::[Long], Long]("items", "item1")(row => row)
.explode[::[Long], Long]("items", "item2")(row => row)
val grouped = expanded
.where(expanded("item1") !== expanded("item2"))
.groupBy("item1", "item2")
.count()
val recs = grouped
.groupBy("item1")
Creating expanded
and grouped
is fine, in a nutshell expanded
is a list of all the possible sets of two IDs where the two IDs were in the same original set. grouped
filters out IDs that were matched with themselves, then groups together all the unique pairs of IDs and produces a count for each. The schema and data sample of grouped
are:
root
|-- item1: long (nullable = true)
|-- item2: long (nullable = true)
|-- count: long (nullable = false)
[1,2,2]
[1,3,2]
[1,4,1]
[1,5,3]
[2,1,2]
[2,3,1]
[2,4,1]
[2,5,2]
...
So, my question is: how do I now group on the first item in each result so that I have a list of tuples? For the example data above, I would expect something similar to this:
[1, [(2, 2), (3, 2), (4, 1), (5, 3)]]
[2, [(1, 2), (3, 1), (4, 1), (5, 2)]]
As you can see in my script with recs
, I thought you would start by doing a groupBy on 'item1' which is the first item in each row. But after that you are left with this GroupedData object that has very limited actions on it. Really, you are only left with doing aggregations like sum, avg, etc. I just want to list the tuples from each result.
I could easily use RDD functions at this point, but that departs from using Dataframes. Is there a way to do this with the dataframe functions.