Scala - Create IndexedDatasetSpark object

2019-07-23 13:44发布

I want to run Spark RowSimilarity recommender on data obtained from mongodb. For this purpose, I've written below code which takes input from mongo, converts it to RDD of Objects. This needs to be passed to IndexedDataSetSpark which is then passed to SimilarityAnalysis.rowSimilarityIDS

import org.apache.hadoop.conf.Configuration
import org.apache.mahout.math.cf.SimilarityAnalysis
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.spark.rdd.{NewHadoopRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.BSONObject
import com.mongodb.hadoop.MongoInputFormat

object SparkExample extends App {
  val mongoConfig = new Configuration()
  mongoConfig.set("mongo.input.uri", "mongodb://my_mongo_ip:27017/db.collection")

  val sparkConf = new SparkConf()
  val sc = new SparkContext("local", "SparkExample", sparkConf)

  val documents: RDD[(Object, BSONObject)] = sc.newAPIHadoopRDD(
    mongoConfig,
    classOf[MongoInputFormat],
    classOf[Object],
    classOf[BSONObject]
  )
  val new_doc: RDD[(String, String)] = documents.map(
    doc1 => (
    doc1._2.get("product_id").toString(),
    doc1._2.get("product_attribute_value").toString().replace("[ \"", "").replace("\"]", "").split("\" , \"").map(value => value.toLowerCase.replace(" ", "-")).mkString(" ")
    )
  )
  var myIDs = IndexedDatasetSpark(new_doc)(sc) 

  SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://myhadoop:9000/myfile", readWriteSchema)

I am unable to create a IndexedDatasetSpark which can be passed to SimilarityAnalysis.rowSimilarityIDS. Please help me in this matter.

Edit1:

I managed to create the IndexedDatasetSpark object and the code now compiles properly. I had to add (sc) as an implicit argument to IndexedDatasetSpark for the code to run:

Error: could not find implicit value for parameter sc: org.apache.spark.SparkContext

Now, when I run it, it gives below error:

Error: could not find implicit value for parameter sc: org.apache.mahout.math.drm.DistributedContext

I cannot figure out how to give DistributedContext.

Is this the proper way to go for creating RDD and converting it to IDS so that it can be processed by rowSimilarityIDS?

More context: I had begun from this situation: Run Mahout RowSimilarity recommender on MongoDB data

My build.sbt:

name := "scala-mongo"

version := "1.0"

scalaVersion := "2.10.6"

libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.2"

libraryDependencies ++= Seq(
  "org.apache.hadoop" % "hadoop-client" % "2.6.0" exclude("javax.servlet", "servlet-api") exclude ("com.sun.jmx", "jmxri") exclude ("com.sun.jdmk", "jmxtools") exclude ("javax.jms", "jms") exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb","hsqldb"),
  "org.scalatest" % "scalatest_2.10" % "1.9.2" % "test"
)

libraryDependencies += "org.apache.mahout" % "mahout-math-scala_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-spark_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-math" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-hdfs" % "0.11.2"

resolvers += "typesafe repo" at "http://repo.typesafe.com/typesafe/releases/"

resolvers += Resolver.mavenLocal

Edit2: I have temporarily removed dfsWrite to let the code execute and stumbled upon below error:

java.io.NotSerializableException: org.apache.mahout.math.DenseVector
Serialization stack:
- object not serializable (class: org.apache.mahout.math.DenseVector, value: {3:1.0,8:1.0,10:1.0})
- field (class: scala.Some, name: x, type: class java.lang.Object)
- object (class scala.Some, Some({3:1.0,8:1.0,10:1.0}))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Is there some serialisation that I may have skipped?

1条回答
太酷不给撩
2楼-- · 2019-07-23 14:15

I'd put back whatever you removed, the secondary error is self-inflicted.

The original error is because you haven't created a SparkContext, which can be done:

implicit val mc = mahoutSparkContext()

Thereafter I think the implicit conversion of the mc (a SparkDistributedContext) to sc (a SparkContext) will be handled by the package helper functions. If the sc is still missing try:

implicit val sc = sdc2sc(mc)
查看更多
登录 后发表回答