Combining files

2019-06-14 10:50发布

I am new to scala. I have two RDD's and I need to separate out my training and testing data. In one file I have all the data and in another just the testing data. I need to remove the testing data from my complete data set.

The complete data file is of the format(userID,MovID,Rating,Timestamp):

res8: Array[String] = Array(1, 31, 2.5, 1260759144)

The test data file is of the format(userID,MovID):

res10: Array[String] = Array(1, 1172)

How do I generate ratings_train that will not have the caes matched with the testing dataset I am using the following function but the returned list is showing empty:

  def create_training(data: RDD[String], ratings_test: RDD[String]): ListBuffer[Array[String]] = {
val ratings_split = dropheader(data).map(line => line.split(","))
val ratings_testing = dropheader(ratings_test).map(line => line.split(",")).collect()
var ratings_train = new ListBuffer[Array[String]]()
ratings_split.foreach(x => {
  ratings_testing.foreach(y => {
    if (x(0) != y(0) || x(1) != y(1)) {
      ratings_train += x
    }
  })
})
return ratings_train

}

EDIT: changed code but running into memory issues.

1条回答
冷血范
2楼-- · 2019-06-14 11:25

This may work.

def create_training(data: RDD[String], ratings_test: RDD[String]): Array[Array[String]] = {
  val ratings_split = dropheader(data).map(line => line.split(","))
  val ratings_testing = dropheader(ratings_test).map(line => line.split(","))

  ratings_split.filter(x => {
    ratings_testing.exists(y =>
      (x(0) == y(0) && x(1) == y(1))
    ) == false
  })
}
  1. The code snippets you posted are not logically correct. A row will only be part of the final data if it has no presence in the test data. But in the code you picked the row if it does not match with any of the test data. But we should check whether it does not match with all of the test data and then only we can decide whether it is a valid row or not.
  2. You are using RDD, but now exploring the full power of them. I guess you are reading the input from a csv file. Then you can structure your data in the RDD, no need to spit the string based on comma character and manually processing them as ROW. You can take a look at the DataFrame API of spark. These links may help: https://www.tutorialspoint.com/spark_sql/spark_sql_dataframes.htm , http://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes

Using Regex:

  def main(args: Array[String]): Unit = {
    // creating test data set
    val data = spark.sparkContext.parallelize(Seq(
      //      "userID, MovID, Rating, Timestamp",
      "1, 31, 2.5, 1260759144",
      "2, 31, 2.5, 1260759144"))

    val ratings_test = spark.sparkContext.parallelize(Seq(
      //      "userID, MovID",
      "1, 31",
      "2, 30",
      "30, 2"
    ))

    val result = getData(data, ratings_test).collect()
    // the result will only contain "2, 31, 2.5, 1260759144"
  }

  def getData(data: RDD[String], ratings_test: RDD[String]): RDD[String] = {
    val ratings = dropheader(data)
    val ratings_testing = dropheader(ratings_test)

    // Broadcasting the test rating data to all spark nodes, since we are collecting this before hand.
    // The reason we are collecting the test data is to avoid call collect in the filter logic
    val ratings_testing_bc = spark.sparkContext.broadcast(ratings_testing.collect.toSet)

    ratings.filter(rating => {
      ratings_testing_bc.value.exists(testRating => regexMatch(rating, testRating)) == false
    })
  }

  def regexMatch(data: String, testData: String): Boolean = {
    // Regular expression to find first two columns
    val regex = """^([^,]*), ([^,\r\n]*),?""".r

    val (dataCol1, dataCol2) = regex findFirstIn data match {
      case Some(regex(col1, col2)) => (col1, col2)
    }

    val (testDataCol1, testDataCol2) = regex findFirstIn testData match {
      case Some(regex(col1, col2)) => (col1, col2)
    }

    (dataCol1 == testDataCol1) && (dataCol2 == testDataCol2)
  }
查看更多
登录 后发表回答