I'm new to Spark and Scala but have the following UseCase to play around.
I've n tweets and m companies, n >> m, both in an RDD. I want to join them together in order to check which companies are present in the tweet.
In iterative programming I'd do something like a nested loop join with a custom matching function.
In Scala and Spark I'm using the cartesian function on the RDD but this gives me an out of memory exception. I've already set spark.executor.memory to 12g
Here is my code
val tweets = sc.textFile("/Users/romeokienzler/Documents/proj/conferences/velocity15/mytwitter/tweets.csv");
val companies = sc.textFile("/Users/romeokienzler/Documents/proj/conferences/velocity15/mytwitter/companies.csv");
val tweetsHeaderAndRows = tweets.filter(s => s.count(_ == ',')==3).map(line => line.split(",").map(_.trim))
val tweetsHeader = tweetsHeaderAndRows.first
val tweetsData = tweetsHeaderAndRows.filter(_(0) != tweetsHeader(0))
val tweetMaps = tweetsData.map(splits => tweetsHeader.zip(splits).toMap)
val companiesData = companies.filter(s => !s.equals("COMPANY_NAME_ID"));
val tweetsWithCompany = tweetMaps.cartesian(companiesData).filter(t => t._1("TEXT").toLowerCase().contains(t._2.toLowerCase))
val companyAndScore = tweetsWithCompany.map(t => (t._2,t._1("SCORE").toDouble))
val companyFrequency = companyAndScore.map(t => (t._2,1)).reduceByKey(_ + _)
companyFrequency.collect()
And here are the two files:
https://github.com/romeokienzler/developerWorks/raw/master/tweets.csv
https://github.com/romeokienzler/developerWorks/raw/master/companies.csv
What would be a better solution to do the join?
Thanks a lot in advance...
If the companies data is quite small (lets say MBs or less), you can
val companiesData = companies.collect()
to download it to the driver and then send it out as a Broadcast Variable. Then you will have that variable available in any of your RDD functions on tweets that could get you something more like your iterative programming idea.Looking at the raw data you have it seems like the companies data is very small in this example, something like ~500 company names. This should fit into a local collection just fine. Assume the starting RDDs are already munged properly: