Spark filtering based on matches in two Arrays in

2019-09-20 07:49发布

问题:

I have a RDD of Words, than I have another RDD of something that contains a string that if a match is made it is removed from the string.

val wordList = sc.textFile("wordList.txt").map(x => x.split(',')).map(x => x(0))

Sample of wordList:

res15: Array[String] = Array(basetting, choosinesses, concavenesses, crabbinesses, cupidinously, falliblenesses, fleecinesses, hackishes, immaterialnesses, impiousnesses)

Than I have my other:

val filterWord = posts.map(x => (x._1, x._2.split(" ").filter(x => x != (wordList)))

Sample filterWord:

res16: Array[(String, Array[String])] = Array((6,Array(how, sweet, is, it, that, we, have)), (2,Array("")), (2,Array(will, this, question, cause, an, error)), (2,Array("")), (4,Array(how, do, we, create, a, new, tag, in), (7,Array("")), (2,Array(test, after, clr, on)), (2,Array("")), (2,Array(testing, a, long, tag)), (2,Array("")))

I need to get filterWord to only contain words that are not in the wordList but doesnt seem to be working because it is not filtering out any words in the wordList and if I change it to == instead it filters out everything.

回答1:

This removes any post that contains any of the words in wordlist. It may or may not be what you want. Please do clarify your question.

Spark setup.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
val sc = new SparkContext(conf)

Test data:

val jabberwocky = """
Twas brillig, and the slithy toves
      Did gyre and gimble in the wabe:
All mimsy were the borogoves,
      And the mome raths outgrabe.

“Beware the Jabberwock, my son!
      The jaws that bite, the claws that catch!
Beware the Jubjub bird, and shun
      The frumious Bandersnatch!”

He took his vorpal sword in hand;
      Long time the manxome foe he sought—
So rested he by the Tumtum tree
      And stood awhile in thought.

And, as in uffish thought he stood,
      The Jabberwock, with eyes of flame,
Came whiffling through the tulgey wood,
      And burbled as it came!

One, two! One, two! And through and through
      The vorpal blade went snicker-snack!
He left it dead, and with its head
      He went galumphing back.

“And hast thou slain the Jabberwock?
      Come to my arms, my beamish boy!
O frabjous day! Callooh! Callay!”
      He chortled in his joy.

’Twas brillig, and the slithy toves
      Did gyre and gimble in the wabe:
All mimsy were the borogoves,
      And the mome raths outgrabe
"""
val words = "the and in all were"

Convert the test data to RDDs.

val posts = sc.parallelize(jabberwocky.split('\n')
                                      .filter(_.nonEmpty)
                                      .zipWithIndex
                                      .map (_.swap))

val wordList = sc.parallelize(words.split(' ')).map(x => (x.toLowerCase(), x))

Make a PairRDD where there is a row for each word in each post. The key is each of the words, and the value is the original post

val postsPairs = posts.flatMap
    { case (i, s) => s.split("\\W+").map(w=> (w.toLowerCase(), (i, s))) }

Find all the posts that DO have one of the excluded words

  val withExcluded = postsPairs.join(wordList).map(_._2._1)

(could do a .distinct here but there's no point, the duplicates won't matter for the next step)

Remove all the posts from the original list that have one of the excluded words. So any remaining have none of the excluded words. WWWWW.

  val res = posts.subtract(withExcluded)

  // (19,      He went galumphing back.)
  // (22,O frabjous day! Callooh! Callay!”)
  // (21,      Come to my arms, my beamish boy!)