I want to load cassandra table to a datafram in spark, I have followed the sample programes below (found in this answer), but I am getting an execption mentioned below, I have tried to load the table to RDD first then convert it to Datafrme, loading the RDD is successful, but when I try to convert it to a dataframe I am getting the same execption faced in the first methdology, any suggestions ? I am using Spark 2.0.0, Cassandra 3.7, and Java 8.
public class SparkCassandraDatasetApplication {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("SparkCassandraDatasetApplication")
.config("spark.sql.warehouse.dir", "/file:C:/temp")
.config("spark.cassandra.connection.host", "127.0.0.1")
.config("spark.cassandra.connection.port", "9042")
.master("local[2]")
.getOrCreate();
//Read data to dataframe
// this is throwing an exception
Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "mykeyspace");
put("table", "mytable");
}
}).load();
//Print data
dataset.show();
spark.stop();
}
}
When submitted I am getting this exception:
Exception in thread "main" java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: Provider org.apache.hadoop.fs.s3.S3FileSystem not found
at java.util.ServiceLoader.fail(ServiceLoader.java:239)
at java.util.ServiceLoader.access$300(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:372)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2623)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2634)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2651)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.makeQualifiedPath(SessionCatalog.scala:115)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:145)
Using the RDD method to read from cassandra is successful ( i have tested it with count() call), but converting the RDD to DF is throwing the same exception faced in the first method.
public class SparkCassandraRDDApplication {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("App")
.config("spark.sql.warehouse.dir", "/file:/opt/spark/temp")
.config("spark.cassandra.connection.host", "127.0.0.1")
.config("spark.cassandra.connection.port", "9042")
.master("local[2]")
.getOrCreate();
SparkContext sc = spark.sparkContext();
//Read
JavaRDD<UserData> resultsRDD = javaFunctions(sc).cassandraTable("mykeyspace", "mytable",CassandraJavaUtil.mapRowTo(UserData.class));
//This is again throwing an exception
Dataset<Row> usersDF = spark.createDataFrame(resultsRDD, UserData.class);
//Print
resultsRDD.foreach(data -> {
System.out.println(data.id);
System.out.println(data.username);
});
sc.stop();
}
}