Spark Datastax Java API Select statements

2019-01-20 11:30发布

问题:

I'm using a tutorial here in this Github to run spark on cassandra using a java maven project: https://github.com/datastax/spark-cassandra-connector.

I've figured how to use direct CQL statements, as I have previously asked a question about that here: Querying Data in Cassandra via Spark in a Java Maven Project

However, now I'm trying to use the datastax java API in fear that my original code in my original question will not work for Datastax version of Spark and Cassandra. For some weird reason, it won't let me use .where even though it is outlined in the documentation that I can use that exact statement. Here is my code:

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.io.Serializable;

import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App implements Serializable
{

    // firstly, we define a bean class
    public static class Person implements Serializable {
        private Integer id;
        private String fname;
        private String lname;
        private String role;

        // Remember to declare no-args constructor
        public Person() { }

        public Integer getId() { return id; }
        public void setId(Integer id) { this.id = id; }

        public String getfname() { return fname; }
        public void setfname(String fname) { this.fname = fname; }

        public String getlname() { return lname; }
        public void setlname(String lname) { this.lname = lname; }

        public String getrole() { return role; }
        public void setrole(String role) { this.role = role; }

        // other methods, constructors, etc.
    }

    private transient SparkConf conf;
    private App(SparkConf conf) {
        this.conf = conf;
    }


    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        createSchema(sc);


        sc.stop();
    }

    private void createSchema(JavaSparkContext sc) {

        JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("role=?", "IT Engineer").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });
        System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
       }



    public static void main( String[] args )
    {
        if (args.length != 2) {
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);

        App app = new App(conf);
        app.run();
    }
}

here is the error:

14/09/23 13:46:53 ERROR executor.Executor: Exception in task ID 0
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
    at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91)
    at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
    at com.sun.proxy.$Proxy8.prepare(Unknown Source)
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:293)
    ... 27 more
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:97)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:156)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:131)
    at com.google.common.util.concurrent.Futures$1.apply(Futures.java:711)
    at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:849)
    ... 3 more
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
    at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
14/09/23 13:46:53 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 1 times; aborting job
14/09/23 13:46:53 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/09/23 13:46:53 INFO scheduler.DAGScheduler: Failed to run toArray at App.java:65
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 1 times (most recent failure: Exception failure: java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/09/23 13:46:53 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

I know that my error is specifically at this section:

JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("role=?", "IT Engineer").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });

When I remove the .where(), it works. But it says specifically on github that you should be able to execute .where and .map functions respectively. Does anyone have any type of reasoning for this? or solution? Thanks.

edit i get the error to go away when i use this statement instead:

JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("id=?", "1").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });

I have no idea why this option works but not the rest of my variations. Here are the statements i ran in my cql so that you know what my keyspace looks like:

    session.execute("DROP KEYSPACE IF EXISTS tester");
    session.execute("CREATE KEYSPACE tester WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}");
    session.execute("CREATE TABLE tester.emp (id INT PRIMARY KEY, fname TEXT, lname TEXT, role TEXT)");
    session.execute("CREATE TABLE tester.empByRole (id INT, fname TEXT, lname TEXT, role TEXT, PRIMARY KEY (role,id))");
    session.execute("CREATE TABLE tester.dept (id INT PRIMARY KEY, dname TEXT)");       

    session.execute(
              "INSERT INTO tester.emp (id, fname, lname, role) " +
              "VALUES (" +
                  "0001," +
                  "'Angel'," +
                  "'Pay'," +
                  "'IT Engineer'" +
                  ");");
    session.execute(
              "INSERT INTO tester.emp (id, fname, lname, role) " +
              "VALUES (" +
                  "0002," +
                  "'John'," +
                  "'Doe'," +
                  "'IT Engineer'" +
                  ");");
    session.execute(
              "INSERT INTO tester.emp (id, fname, lname, role) " +
              "VALUES (" +
                  "0003," +
                  "'Jane'," +
                  "'Doe'," +
                  "'IT Analyst'" +
                  ");");
    session.execute(
          "INSERT INTO tester.empByRole (id, fname, lname, role) " +
          "VALUES (" +
              "0001," +
              "'Angel'," +
              "'Pay'," +
              "'IT Engineer'" +
              ");");
    session.execute(
              "INSERT INTO tester.empByRole (id, fname, lname, role) " +
              "VALUES (" +
                  "0002," +
                  "'John'," +
                  "'Doe'," +
                  "'IT Engineer'" +
                  ");");
    session.execute(
              "INSERT INTO tester.empByRole (id, fname, lname, role) " +
              "VALUES (" +
                  "0003," +
                  "'Jane'," +
                  "'Doe'," +
                  "'IT Analyst'" +
                  ");");
        session.execute(
              "INSERT INTO tester.dept (id, dname) " +
              "VALUES (" +
                  "1553," +
                  "'Commerce'" +
                  ");");

回答1:

The where method adds ALLOW FILTERING to your query under the covers. This is not a magic bullet, as it still doesn't support arbitrary fields as query predicates. In general, the field must either be indexed or a clustering column. If this isn't practical for your data model, you can simply use the filter method on the RDD. The downside is that the filter takes place in Spark and not in Cassandra.

So the id field works because it's supported in a CQL WHERE clause, whereas I'm assuming role is just a regular field. Please note that I am NOT suggesting that you index your field or change it to a clustering column, as I don't know your data model.



回答2:

There is a limitation in the Spark Cassandra Connector that the where method will not work on partitioning keys. In your table empByRole, role is a partitioning key, hence the error. It should work correctly on clustering columns or indexed columns (secondary indexes).

This is being tracked as issue 37 in the GitHub project and work has been ongoing.

On the Java API doc page, the examples shown used .where("name=?", "Anna"). I assume that name is not a partitioning key, but the example could be more clear about that.