I want to run Spark RowSimilarity recommender on data obtained from mongodb. For this purpose, I've written below code which takes input from mongo, converts it to RDD of Objects. This needs to be passed to IndexedDataSetSpark which is then passed to SimilarityAnalysis.rowSimilarityIDS
import org.apache.hadoop.conf.Configuration
import org.apache.mahout.math.cf.SimilarityAnalysis
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.spark.rdd.{NewHadoopRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.BSONObject
import com.mongodb.hadoop.MongoInputFormat
object SparkExample extends App {
val mongoConfig = new Configuration()
mongoConfig.set("mongo.input.uri", "mongodb://my_mongo_ip:27017/db.collection")
val sparkConf = new SparkConf()
val sc = new SparkContext("local", "SparkExample", sparkConf)
val documents: RDD[(Object, BSONObject)] = sc.newAPIHadoopRDD(
mongoConfig,
classOf[MongoInputFormat],
classOf[Object],
classOf[BSONObject]
)
val new_doc: RDD[(String, String)] = documents.map(
doc1 => (
doc1._2.get("product_id").toString(),
doc1._2.get("product_attribute_value").toString().replace("[ \"", "").replace("\"]", "").split("\" , \"").map(value => value.toLowerCase.replace(" ", "-")).mkString(" ")
)
)
var myIDs = IndexedDatasetSpark(new_doc)(sc)
SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://myhadoop:9000/myfile", readWriteSchema)
I am unable to create a IndexedDatasetSpark which can be passed to SimilarityAnalysis.rowSimilarityIDS. Please help me in this matter.
Edit1:
I managed to create the IndexedDatasetSpark object and the code now compiles properly. I had to add (sc)
as an implicit argument to IndexedDatasetSpark
for the code to run:
Error: could not find implicit value for parameter sc: org.apache.spark.SparkContext
Now, when I run it, it gives below error:
Error: could not find implicit value for parameter sc: org.apache.mahout.math.drm.DistributedContext
I cannot figure out how to give DistributedContext.
Is this the proper way to go for creating RDD and converting it to IDS so that it can be processed by rowSimilarityIDS?
More context: I had begun from this situation: Run Mahout RowSimilarity recommender on MongoDB data
My build.sbt:
name := "scala-mongo"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.2"
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % "2.6.0" exclude("javax.servlet", "servlet-api") exclude ("com.sun.jmx", "jmxri") exclude ("com.sun.jdmk", "jmxtools") exclude ("javax.jms", "jms") exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb","hsqldb"),
"org.scalatest" % "scalatest_2.10" % "1.9.2" % "test"
)
libraryDependencies += "org.apache.mahout" % "mahout-math-scala_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-spark_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-math" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-hdfs" % "0.11.2"
resolvers += "typesafe repo" at "http://repo.typesafe.com/typesafe/releases/"
resolvers += Resolver.mavenLocal
Edit2: I have temporarily removed dfsWrite to let the code execute and stumbled upon below error:
java.io.NotSerializableException: org.apache.mahout.math.DenseVector
Serialization stack:
- object not serializable (class: org.apache.mahout.math.DenseVector, value: {3:1.0,8:1.0,10:1.0})
- field (class: scala.Some, name: x, type: class java.lang.Object)
- object (class scala.Some, Some({3:1.0,8:1.0,10:1.0}))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Is there some serialisation that I may have skipped?
I'd put back whatever you removed, the secondary error is self-inflicted.
The original error is because you haven't created a SparkContext, which can be done:
Thereafter I think the implicit conversion of the mc (a SparkDistributedContext) to sc (a SparkContext) will be handled by the package helper functions. If the sc is still missing try: