Dataframe where clause doesn't work when use s

2019-06-04 05:22发布

问题:

We use python spark cassandra driver V3.0.0. from datastax When try to load data by using dataframe, the where clause doesn't work. However the CQL itself does work in Datastax DevCenter. The code looks like this

dataf = sqlc.read.format("org.apache.spark.sql.cassandra")\

.options(table="tran_history", keyspace="test")\

.load()\

.where("usr_id='abc' log_ts >= maxtimeuuid('2016-02-01 10:09:26-0800')")\

.collect()

It seems the driver doesn't recognize method maxtimeuuid

------------------Below is the error

File "C:\Spark\spark-1.4.1-bin-hadoop2.6.2\python\lib\pyspark.zip\pyspark\sql\dataframe.py", line 759, in filter

File "C:\Spark\spark-1.4.1-bin-hadoop2.6.2\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__

File "C:\Spark\spark-1.4.1-bin-hadoop2.6.2\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", line 300, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o41.filter.

: java.util.NoSuchElementException: key not found: maxtimeuuid

Not sure if there is any version match issue. We are using DSE 4.8.1.

回答1:

Api Conflicts

Dataframes do not use the SparkCassandra connector api, so when you type where on a DataFrame it is actually invoking a Catalyst call. This is not being transferred to the underlying CQL layer but instead being applied in Spark itself. Spark doesn't know what "maxtimeuuid" is so it fails.

Filters rows using the given SQL expression.

See http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame

Since this predicate is invalid it will never reach the connector so we will not be able to process a clause like this at the datasource level.

Only the Spark Cassandra Connector RDD.where clause will directly pass CQL to the underlying RDD.

Adds a CQL WHERE predicate(s) to the query. Useful for leveraging secondary indexes in Cassandra. Implicitly adds an ALLOW FILTERING clause to the WHERE clause, however beware that some predicates might be rejected by Cassandra, particularly in cases when they filter on an unindexed, non-clustering column.

http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.0-M1/spark-cassandra-connector/#com.datastax.spark.connector.rdd.CassandraRDD

Dataframes and TimeUUID

Comparing TimeUUIDs with Dataframes is going to be difficult since Catalyst has no notion of TimeUUID as a type so the Connector Reads them (through DataFrames) as a String. This is a problem because TimeUUIDs are not lexically comparable so you won't get the right answer even if you generate the TimeUUID and then compare with it directly instead of calling a function.