Spark and Cassandra Java Application Exception Pro

2020-08-02 07:03发布

问题:

I want to load cassandra table to a datafram in spark, I have followed the sample programes below (found in this answer), but I am getting an execption mentioned below, I have tried to load the table to RDD first then convert it to Datafrme, loading the RDD is successful, but when I try to convert it to a dataframe I am getting the same execption faced in the first methdology, any suggestions ? I am using Spark 2.0.0, Cassandra 3.7, and Java 8.

 public class SparkCassandraDatasetApplication {
 public static void main(String[] args) {
 SparkSession spark = SparkSession
          .builder()
          .appName("SparkCassandraDatasetApplication")
          .config("spark.sql.warehouse.dir", "/file:C:/temp")
          .config("spark.cassandra.connection.host", "127.0.0.1")
          .config("spark.cassandra.connection.port", "9042")
          .master("local[2]")
          .getOrCreate();

 //Read data to dataframe
 // this is throwing an exception
 Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
        .options(new HashMap<String, String>() {
            {
                put("keyspace", "mykeyspace");
                put("table", "mytable");
            }
        }).load();

   //Print data
   dataset.show();       
   spark.stop();
   }        
}

When submitted I am getting this exception:

Exception in thread "main" java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: Provider org.apache.hadoop.fs.s3.S3FileSystem not found
at java.util.ServiceLoader.fail(ServiceLoader.java:239)
at java.util.ServiceLoader.access$300(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:372)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2623)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2634)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2651)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.makeQualifiedPath(SessionCatalog.scala:115)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:145)

Using the RDD method to read from cassandra is successful ( i have tested it with count() call), but converting the RDD to DF is throwing the same exception faced in the first method.

public class SparkCassandraRDDApplication {
public static void main(String[] args) {
             SparkSession spark = SparkSession
              .builder()
              .appName("App")
              .config("spark.sql.warehouse.dir", "/file:/opt/spark/temp")
              .config("spark.cassandra.connection.host", "127.0.0.1")
              .config("spark.cassandra.connection.port", "9042")
              .master("local[2]")
              .getOrCreate();

    SparkContext sc = spark.sparkContext();

    //Read
    JavaRDD<UserData> resultsRDD = javaFunctions(sc).cassandraTable("mykeyspace", "mytable",CassandraJavaUtil.mapRowTo(UserData.class));

    //This is again throwing an exception
    Dataset<Row> usersDF = spark.createDataFrame(resultsRDD, UserData.class);

    //Print
    resultsRDD.foreach(data -> {
        System.out.println(data.id);
        System.out.println(data.username);
    });

    sc.stop();
  }
}

回答1:

Please check if "hadoop-common-2.2.0.jar" is available in classpath. You can test your application by creating a jar including all the dependencies. Use below pom.xml in which maven-shade-plugin is used to include all the dependencies to create uber jar.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.abaghel.examples.spark</groupId>
<artifactId>spark-cassandra</artifactId>
<version>1.0.0-SNAPSHOT</version>
<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.11</artifactId>
        <version>2.0.0-M3</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>reference.conf</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.abaghel.examples.spark.cassandra.SparkCassandraDatasetApplication</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
  </build>
</project>

You can run the jar like below

spark-submit --class com.abaghel.examples.spark.cassandra.SparkCassandraDatasetApplication spark-cassandra-1.0.0-SNAPSHOT.jar