Parsing multiline records in Scala

2020-01-26 12:07发布

问题:

Here is my RDD[String]

M1 module1
PIP a Z A
PIP b Z B
PIP c Y n4

M2 module2
PIP a I n4
PIP b O D
PIP c O n5

and so on. Basically, I need a RDD of key (containing the second word on line1) and values of the subsequent PIP lines that can be iterated upon.

I've tried the following

val usgPairRDD = usgRDD.map(x => (x.split("\\n")(0), x))

but this gives me the following output

(,)
(M1 module1,M1 module1)
(PIP a Z A,PIP a Z A)
(PIP b Z B,PIP b Z B)
(PIP c Y n4,PIP c Y n4)
(,)
(M2 module2,M2 module2)
(PIP a I n4,PIP a I n4)
(PIP b O D,PIP b O D)
(PIP c O n5,PIP c O n5)

Instead, I'd like the output to be

module1, (PIP a Z A, PIP b Z B, PIP b Z B)
module2, (PIP a I n4,PIP b O D, PIP c O n5)

What am I doing wrong? I am quite new to Spark APIs. Thanks

Hi @zero323

usgRDD.take(10).foreach(x => println(x + "%%%%%%%%%"))

yields...

%%%%%%%%%
M1 module1%%%%%%%%%
PIP a Z A%%%%%%%%%
PIP b Z B%%%%%%%%%
PIP c Y n4%%%%%%%%%
%%%%%%%%%
M2 module2%%%%%%%%%
PIP a I n4%%%%%%%%%
PIP b O D%%%%%%%%%
PIP c O n5%%%%%%%%%

and so on

Hi @zero323 and @Daniel Darabos My input is very very large set of many many files (spanning in TBs). Here is sample..

BIN n4
BIN n5
BIN D
BIN E
PIT A I A
PIT B I B 
PIT C I C
PIT D O D
PIT E O E
DEF M1 module1
   PIP a Z A
   PIP b Z B
   PIP c Y n4
DEF M2 module2
   PIP a I n4
   PIP b O D
   PIP c O n5

I need all the BINS, PIT and DEF (including PIP lines below) in 3 different RDDS. Here is how I am doing this currently (from the discussion, I sense usgRDD below is wrongly computed)

val binRDD = levelfileRDD.filter(line => line.contains("BIN"))
val pitRDD = levelfileRDD.filter(line => line.contains("PIT"))
val usgRDD = levelfileRDD.filter(line => !line.contains("BIN") && !line.contains("PIT")).flatMap(s=>s.split("DEF").map(_.trim))

I need 3 types (at the moment) of RDDs because I need to perform validation later on. For example, "n4" under "DEF M2 module2" can only exist if n4 is a BIN element. From the RDDs, I hope to derive relationships using GraphX APIs (I have obviously not come upto this point). It would be ideal if each usgPairRDD (computed from usgRDD or otherwise) prints the following

module1, (a Z A, b Z B, c Y n4) %%%%%%%
module2, (a I n4, b O D, c O n5) %%%%%%%

I hope I am making sense. Apologies to the Spark Gods, if I am not.

回答1:

By default Spark creates a single element per line. It means that in your case every record is spread over multiple elements which, as stated by Daniel Darabos in the comments, can be processed by different workers.

Since it looks like your data is relatively regular and separated by an empty line you should be able to use newAPIHadoopFile with custom delimiter:

import org.apache.spark.rdd.RDD
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}

val path: String = ???

val conf = new org.apache.hadoop.mapreduce.Job().getConfiguration
conf.set("textinputformat.record.delimiter", "\n\n")

val usgRDD = sc.newAPIHadoopFile(
    path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
  .map{ case (_, v) => v.toString }

val usgPairRDD: RDD[(String, Seq[String])] = usgRDD.map(_.split("\n") match {
  case Array(x, xs @ _*) => (x, xs)
})

In Spark 2.4 or later data loading part can be also achieved using Dataset API:

val ds: Dataset[String] = spark.read.option("lineSep", "\n\n").text(path)