Extract specific columns form a text file to make

2019-09-12 06:21发布

问题:

I need to clean some data in scala. I have the following raw data and they are exist in a text file:

06:36:15.718068 IP 10.0.0.1.5001 > 10.0.0.2.41516: Flags [.], ack 346, win 163, options [nop,nop,TS val 1654418 ecr 1654418], length 0
06:36:15.718078 IP 10.0.0.2.41516 > 10.0.0.1.5001: Flags [.], seq 1:65161, ack 0, win 58, options [nop,nop,TS val 1654418 ecr 1654418], length 65160

I need to have all of them in a dataframe in the following way:

+----------------+-----------+----------+-------+--------+--------+--------+-----+
|time_stamp_0    |sender_ip_1|reciver_2 |s_por_3|r_por_4 |acknu_5 |winnum_6|len_7|
+----------------+-----------+----------+-------+--------+--------+--------+-----+
|06:36:15.718068 |10.0.0.1   |10.0.0.2  |5001   |41516   |346     |163     |  0  |
|06:36:15.718078 |10.0.0.2   |10.0.0.1  |41516  |5001    |  0     | 58     |65160|
+----------------+-----------+----------+-------+--------+--------+--------+-----+

I used the following code to do that to gain the above dataframe.

  val customSchema = StructType(Array(
      StructField("time_stamp_0", StringType, true),
      StructField("sender_ip_1", StringType, true),
      StructField("receiver_ip_2", StringType, true),
      StructField("s_port_3", StringType, true),
      StructField("r_port_4", StringType, true),
      StructField("acknum_5", StringType, true),
      StructField("winnum_6", StringType, true),
      StructField("len_7", IntegerType, true)))

    ///////////////////////////////////////////////////make train dataframe
    val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt")
    val Row_Dstream_Train = Dstream_Train.map(line => line.split(">")).map(array => {
      val first = Try(array(0).trim.split(" ")(0)) getOrElse ""
      val second = Try(array(1).trim.split(" ")(0)) getOrElse ""
      val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""
      val fourth = Try(array(3).trim.split(" ")(0)) getOrElse ""
      val fifth = Try(array(4).trim.split(" ") (0)) getOrElse ""
      val sixth = Try(array(5).trim.split(" ") (0)) getOrElse ""
      val seventh = Try(array(6).trim.split(" ")(0)) getOrElse ""
      val eighth = Try(array(7).trim.split(" ")(0)) getOrElse ""

      val firstFixed = first.take(first.lastIndexOf("."))
      val secondfix = second.take(second.lastIndexOf("."))
      val thirdFixed = third.take(third.lastIndexOf("."))
      Row.fromSeq(Seq(firstFixed, secondfix, thirdFixed, fourth,fifth,sixth,seventh,eighth))
    })
    val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)

But the problem is that from the thired column nothing extracted! Can you please guid me why the third column is extracted empty? Thanks

回答1:

Your input data is not of fixed length so it was a bit tricky to get the solution you require. Considering the input data your provided following can be solution. You can change as your need increases

val Row_Dstream_Train = Dstream_Train.map(line => line.split(",")).map(array => {

  val array1 = array(0).trim.split("IP")
  val array2 = array1(1).split(">")
  val array3 = array2(1).split(":")

  val acknum5 = if(array(1).contains("seq")) array(2) else array(1)
  val winnum6 = if(array(1).contains("seq")) array(3) else array(2)
  val len7 = if(array(1).contains("seq")) array(1).trim.split(" ")(1) else ""

  val first = Try(array1(0).trim) getOrElse ""
  val second = Try(array2(0).trim) getOrElse ""
  val third = Try(array3(0)) getOrElse ""
  val sixth = Try(acknum5.trim.split(" ")(1)) getOrElse ""
  val seventh = Try(winnum6.trim.split(" ")(1)) getOrElse ""
  val eighth = Try(len7.substring(len7.lastIndexOf(":")+1, len7.length).toInt) getOrElse 0

  val secondfix = second.take(second.lastIndexOf("."))
  val sport3 = second.substring(second.lastIndexOf(".")+1, second.length)
  val thirdFixed = third.take(third.lastIndexOf("."))
  val rport4 = third.substring(third.lastIndexOf(".")+1, third.length)

  Row.fromSeq(Seq(first, secondfix, thirdFixed, sport3,rport4,sixth,seventh,eighth))
})
val Frist_Dataframe = sqlContext.createDataFrame(Row_Dstream_Train, customSchema)

You will get output as

+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|time_stamp_0   |sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6|len_7|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|06:36:15.718068|10.0.0.1   | 10.0.0.2    |5001    |41516   |346     |163     |0    |
|06:36:15.718078|10.0.0.2   | 10.0.0.1    |41516   |5001    |0       |58      |65161|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+

I hope the solution is helpful



回答2:

After your first map

file.map(line => line.split(">") ).collect

the output that you have is:

Array[Array[String]] = Array(
Array("06:36:15.718068 IP 10.0.0.1.5001 ", " 10.0.0.2.41516: Flags [.], ack 346, win 163, options [nop,nop,TS val 1654418 ecr 1654418], length 0"), 
Array("06:36:15.718078 IP 10.0.0.2.41516 ", " 10.0.0.1.5001: Flags [.], seq 1:65161, ack 0, win 58, options [nop,nop,TS val 1654418 ecr 1654418], length 65160"))

As you can see, you are getting two arrays, soin your next map step, anything you refer to after array(1) is ArrayIndexOutOfBoundsException

You need to dig deep and check for characters on which you can split. This should work to extract the third column.

val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt")
val third = Dstream_Train.map(line => line.split(">") ).map( x => x(1).split(":")(0).splitAt(x(1).split(":")(0).lastIndexOf("."))._1 ).collect
third: Array[String] = Array(" 10.0.0.2", " 10.0.0.1")

Similarily you can use that to get other columns but as suggested, RegEx should be cleaner and easier.



回答3:

There are a number of issues with this code. The first that needs to be mentioned in the Schema. You are declaring an IntegerType in the schema, but if it does not exist you are use a StringType for a null value. So that will need to be changed.

Also, the Array as pointed out above is an issue as you will get an indexing error.

I just saw Ramesh posted before me with an answer, but this is another way using RegEx.

RegEx is another way forward to solve this. If you look at your example you should notice that in fact that the two rows are in fact structurally different.

So here is what I did to get the outcome (albeit the Regex may need to be tested a bit more just in case). So start with the Regex cases:

object RegexPatterns{ // this needs to be done this way to avoid serialisation errors
  val patternTS: Regex = "([0-9]+:[0-9]+:[0-9]+.[0-9]+)".r
  val patternSIP1: Regex = "(?<=\\b IP \\b)([0-9]+.[0-9].[0-9].[0-9])(?=.[0-9]+)".r
  val patternRIP2: Regex = "(?<=\\b > \\b)([0-9]..[0-9].[0-9].[0-9])(?=.[0-9]+)".r
  val patternSP3: Regex = "(?<=\\b IP \\b[0-9]..[0-9].[0-9].[0-9].)([0-9]+)".r
  val patternRP4: Regex = "(?<=\\b > \\b[0-9]..[0-9].[0-9].[0-9].)([0-9]+)".r
  val patternAN5: Regex = "(?<=\\back \\b)([0-9]+)".r
  val patternWN6: Regex = "(?<=\\bwin \\b)([0-9]+)".r
  val patternL7: Regex = "(?<=\\blength \\b)([0-9]+)".r
}

The onto your already implemented code:

import RegexPatterns._

val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt")

  val customSchema = StructType(Array(
    StructField("time_stamp_0", StringType, nullable = true),
    StructField("sender_ip_1", StringType, nullable = true),
    StructField("receiver_ip_2", StringType, nullable = true),
    StructField("s_port_3", StringType, nullable = true),
    StructField("r_port_4", StringType, nullable = true),
    StructField("acknum_5", StringType, nullable = true),
    StructField("winnum_6", StringType, nullable = true),
    StructField("len_7", StringType, nullable = true)))

  ///////////////////////////////////////////////////make train dataframe
  val Dstream_Train: RDD[String] = sparkContext.parallelize(input)
  val Row_Dstream_Train: RDD[Row] = Dstream_Train.map { line: String =>
    val first = Try((patternTS findAllIn line).mkString(",")) getOrElse ""
    val second = Try((patternSIP1 findAllIn line).mkString(",")) getOrElse ""
    val third = Try((patternRIP2 findAllIn line).mkString(",")) getOrElse ""
    val fourth = Try((patternSP3 findAllIn line).mkString(",")) getOrElse ""
    val fifth = Try((patternRP4 findAllIn line).mkString(",")) getOrElse ""
    val sixth = Try((patternAN5 findAllIn line).mkString(",")) getOrElse ""
    val seventh = Try((patternWN6 findAllIn line).mkString(",")) getOrElse ""
    val eighth = Try((patternL7 findAllIn line).mkString(",")) getOrElse ""

    Row.fromSeq(Seq(first, second, third, fourth, fifth, sixth, seventh, eighth))
  }

  val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)
  Frist_Dataframe.show(false)

This yields:

+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|time_stamp_0   |sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6|len_7|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|06:36:15.718068|10.0.0.1   |10.0.0.2     |5001    |41516   |346     |163     |0    |
|06:36:15.718078|10.0.0.2   |10.0.0.1     |41516   |5001    |0       |58      |65160|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+