I have to dump json data into cosmosDB from spark dataframe using scala and intelliJ. I am reading a csv file from my local machine and converting it into json format. Now I have to dump this json data into cosmosDB collection.
Spark version is 2.2.0 and scala version is 2.11.8
Below is the code which I wrote in IntelliJ with scala for fetching a csv file from my local machine and convert it into a json file.
import org.apache.spark.sql.SparkSession
import com.microsoft.azure.cosmosdb.spark.config.Config
object DataLoadConversion {
def main(args: Array[String]): Unit = {
System.setProperty("spark.sql.warehouse.dir", "file:///C:/spark-warehouse")
val spark = SparkSession.builder().master("local").appName("DataConversion").getOrCreate()
val df = spark.read.format("com.databricks.spark.csv")
.option("quote", "\"")
.option("escape", "\"")
.option("delimiter", ",")
.option("header", "true")
.option("mode", "FAILFAST")
.option("inferSchema","true")
.load("file:///C:/Users/an/Desktop/ct_temp.csv")
val finalDf = df.select(df("history_temp_id").as("NUM"),df("history_temp_time").as("TIME"))
val jsonData = finalDf.select("NUM", "TIME").toJSON
jsonData.show(2)
// COSMOS DB Write configuration
val writeConfig = Config(Map(
"Endpoint" -> "https://cosms.documents.azure.com:443/",
"Masterkey" -> "YOUR-KEY-HERE", //provided primary key
"Database" -> "DBName", //provided with DB name
"Collection" -> "Collection", //provided with collection name
))
// Write to Cosmos DB from the DataFrame
import org.apache.spark.sql.SaveMode
jsonData.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)
}
Below is the build.sbt file
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"com.databricks" %% "spark-csv" % "1.5.0",
)
libraryDependencies += "com.microsoft.azure" % "azure-cosmosdb-spark_2.2.0_2.11" % "1.1.1" % "provided" exclude("org.apache.spark", "spark-core_2.10")
Added cosmosDB dependency to the build.sbt file.
I am new to Spark and Scala. please let me know what all steps to be followed to get connected with cosmos DB from intelliJ with spark and scala?
Build is successful but I am getting below error while running the code.
19/07/10 16:32:41 INFO DocumentClient: Initializing DocumentClient with serviceEndpoint [https://cosms.documents.azure.com/], ConnectionPolicy [ConnectionPolicy [requestTimeout=60, mediaRequestTimeout=300, connectionMode=Gateway, mediaReadMode=Buffered, maxPoolSize=400, idleConnectionTimeout=60, userAgentSuffix= SparkConnector/2.2.0_2.11-1.1.1, retryOptions=com.microsoft.azure.documentdb.RetryOptions@1ef5cde4, enableEndpointDiscovery=true, preferredLocations=[Japan East]]], ConsistencyLevel [Session]
19/07/10 16:33:03 WARN DocumentClient: Failed to retrieve database account information. org.apache.http.conn.HttpHostConnectException: Connect to cosms.documents.azure.com:443 [cosms.documents.azure.com/13.78.51.35] failed: Connection timed out: connect
......
Exception in thread "main" java.lang.IllegalStateException: Http client execution failed.
at com.microsoft.azure.documentdb.internal.GatewayProxy.performGetRequest(GatewayProxy.java:244)
at com.microsoft.azure.documentdb.internal.GatewayProxy.doRead(GatewayProxy.java:93)
If I am connecting out of my office network this is working, but when I my machine is connected with office network i am getting above error. I have tried with configuring proxy settings in below shown page. Settings>>> Proxy settings.
If i try the same end point in chrome i am getting below error.
{"code":"Unauthorized","message":"Required Header authorization is missing. Ensure a valid Authorization token is passed.\r\nActivityId: 54999e41-179e-4877-b8bf-f2c2a33280fd, Microsoft.Azure.Documents.Common/2.5.1"}
how to resolve this? how to bypass proxy from office network?