Alternatives to RDD.cartesian for fuzzy join in Ap

2019-08-10 14:51发布

问题:

I'm new to Spark and Scala but have the following UseCase to play around.

I've n tweets and m companies, n >> m, both in an RDD. I want to join them together in order to check which companies are present in the tweet.

In iterative programming I'd do something like a nested loop join with a custom matching function.

In Scala and Spark I'm using the cartesian function on the RDD but this gives me an out of memory exception. I've already set spark.executor.memory to 12g

Here is my code

val tweets = sc.textFile("/Users/romeokienzler/Documents/proj/conferences/velocity15/mytwitter/tweets.csv");
val companies = sc.textFile("/Users/romeokienzler/Documents/proj/conferences/velocity15/mytwitter/companies.csv");
val tweetsHeaderAndRows = tweets.filter(s => s.count(_ == ',')==3).map(line => line.split(",").map(_.trim))
val tweetsHeader = tweetsHeaderAndRows.first
val tweetsData = tweetsHeaderAndRows.filter(_(0) != tweetsHeader(0))
val tweetMaps = tweetsData.map(splits => tweetsHeader.zip(splits).toMap)
val companiesData = companies.filter(s => !s.equals("COMPANY_NAME_ID"));
val tweetsWithCompany = tweetMaps.cartesian(companiesData).filter(t => t._1("TEXT").toLowerCase().contains(t._2.toLowerCase))
val companyAndScore = tweetsWithCompany.map(t => (t._2,t._1("SCORE").toDouble))
val companyFrequency = companyAndScore.map(t => (t._2,1)).reduceByKey(_ + _)

companyFrequency.collect()

And here are the two files:

https://github.com/romeokienzler/developerWorks/raw/master/tweets.csv

https://github.com/romeokienzler/developerWorks/raw/master/companies.csv

What would be a better solution to do the join?

Thanks a lot in advance...

回答1:

If the companies data is quite small (lets say MBs or less), you can val companiesData = companies.collect() to download it to the driver and then send it out as a Broadcast Variable. Then you will have that variable available in any of your RDD functions on tweets that could get you something more like your iterative programming idea.

Looking at the raw data you have it seems like the companies data is very small in this example, something like ~500 company names. This should fit into a local collection just fine. Assume the starting RDDs are already munged properly:

val tweets: RDD[Tweet] = ...
val companies: RDD[String] = ...

val companyNames = companies.collect()
val companyNamesVar = sparkContext.broadcast(companyNames)

val companyAndScore = tweets.flatMap { tweet =>
  // parse out all the company names in this tweet
  val companyNames: Seq[String] = companyNamesVar.value.filter { name =>
    tweet("TEXT").toLowerCase.contains(name.toLowerCase)
  }

  companyNames.map(companyName => (companyName, tweet("SCORE").toDouble))
}

val companyFrequency = companyAndScore.reduceByKey(_ + _)