Best way to convert online csv to dataframe scala

2019-08-17 09:29发布

问题:

I am trying to figure out the most efficient way to accomplish putting this online csv file into a data frame in Scala.

To save a download, the csv file in the code looks like this:

"Symbol","Name","LastSale","MarketCap","ADR 
TSO","IPOyear","Sector","Industry","Summary Quote"
"DDD","3D Systems Corporation","18.09","2058834640.41","n/a","n/a","Technology","Computer Software: Prepackaged Software","http://www.nasdaq.com/symbol/ddd"
"MMM","3M Company","211.68","126423673447.68","n/a","n/a","Health Care","Medical/Dental Instruments","http://www.nasdaq.com/symbol/mmm"
....

From my research, I start by downloading the csv, and placing it into a list buffer (since you can't do this with a list because it's immutable):

import scala.collection.mutable.ListBuffer

val sc = new SparkContext(conf)

var stockInfoNYSE_ListBuffer = new ListBuffer[java.lang.String]()


import scala.io.Source
    val bufferedSource = 
    Source.fromURL("http://www.nasdaq.com/screening/companies-by-
    industry.aspx?exchange=NYSE&render=download")

for (line <- bufferedSource.getLines) {
    val cols = line.split(",").map(_.trim)

    stockInfoNYSE_ListBuffer += s"${cols(0)},${cols(1)},${cols(2)},${cols(3)},${cols(4)},${cols(5)},${cols(6)},${cols(7)},${cols(8)}"

}
bufferedSource.close

val stockInfoNYSE_List = stockInfoNYSE_ListBuffer.toList

So we have a list. You can basically get each value like this:

// SYMBOL : stockInfoNYSE_List(1).split(",")(0)
// COMPANY NAME : stockInfoNYSE_List(1).split(",")(1)
// IPOYear : stockInfoNYSE_List(1).split(",")(5)
// Sector : stockInfoNYSE_List(1).split(",")(6)
// Industry : stockInfoNYSE_List(1).split(",")(7)

Here is where I get stuck- how do I get this to a dataframe? The wrong approaches I have taken. I didn't put all the values in just yet- was a simple test.

case class StockMap(Symbol: String, Name: String)
val caseClassDS = Seq(StockMap(stockInfoNYSE_List(1).split(",")(0), 
StockMap(stockInfoNYSE_List(1).split(",")(1))).toDS()

caseClassDS.show()

The problem with the approach above: I can only figure out how to add one sequence (row) by hard coding it. I want every Row in the list.

My second failed attempt:

val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val test = stockInfoNYSE_List.toDF

This will just give you the array, and I want to divide up the values.

Array(["Symbol","Name","LastSale","MarketCap","ADR TSO","IPOyear","Sector","Industry","Summary Quote"], ["DDD","3D Systems Corporation","18.09","2058834640.41","n/a","n/a","Technology","Computer Software: Prepackaged Software","http://www.nasdaq.com/symbol/ddd"], ["MMM","3M Company","211.68","126423673447.68","n/a","n/a","Health Care","Medical/Dental Instruments","http://www.nasdaq.com/symbol/mmm"],....... 

回答1:

case class TestClass(Symbol:String,Name:String,LastSale:String,MarketCap :String,ADR_TSO:String,IPOyear:String,Sector: String,Industry:String,Summary_Quote:String
     | )
 defined class TestClass

var stockDF= stockInfoNYSE_ListBuffer.drop(1)

val demoDS = stockDF.map(line => {
  val fields = line.replace("\"","").split(",")
  TestClass(fields(0), fields(1), fields(2),fields(3), fields(4), fields(5),fields(6), fields(7), fields(8))
})

scala> demoDS.toDS.show

+------+--------------------+--------+---------------+-------------+-------+-----------------+--------------------+--------------------+
|Symbol|                Name|LastSale|      MarketCap|      ADR_TSO|IPOyear|           Sector|            Industry|       Summary_Quote|
+------+--------------------+--------+---------------+-------------+-------+-----------------+--------------------+--------------------+
|   DDD|3D Systems Corpor...|   18.09|  2058834640.41|          n/a|    n/a|       Technology|Computer Software...|http://www.nasdaq...|
|   MMM|          3M Company|  211.68|126423673447.68|          n/a|    n/a|      Health Care|Medical/Dental In...|http://www.nasdaq...|


回答2:

In case anyone is trying to get this example working, here is the code using the above solution:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import scala.collection.mutable.ListBuffer
import sqlContext.implicits._

var stockInfoNYSE_ListBuffer = new ListBuffer[java.lang.String]()

import scala.io.Source
    val bufferedSource =
    Source.fromURL("http://www.nasdaq.com/screening/companies-by-industry.aspx?exchange=NYSE&render=download")

for (line <- bufferedSource.getLines) {
    val cols = line.split(",").map(_.trim)

    stockInfoNYSE_ListBuffer += s"${cols(0)},${cols(1)},${cols(2)},${cols(3)},${cols(4)},${cols(5)},${cols(6)},${cols(7)},${cols(8)}"

}
bufferedSource.close



case class TestClass(Symbol:String,Name:String,LastSale:String,MarketCap :String,ADR_TSO:String,IPOyear:String,Sector: String,Industry:String,Summary_Quote:String )

var stockDF= stockInfoNYSE_ListBuffer.drop(1)

val demoDS = stockDF.map(line => {
  val fields = line.replace("\"","").split(",")
  TestClass(fields(0), fields(1), fields(2),fields(3), fields(4), fields(5),fields(6), fields(7), fields(8))
})

demoDS.toDF().show