Apache Phoenix (4.3.1 and 4.4.0-HBase-0.98) on Spa

2019-06-27 20:13发布

问题:

I'm trying to connect to Phoenix via Spark and I keep getting the following exception when opening a connection via the JDBC driver (cut for brevity, full stacktrace below):

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)

The class in question is provided by the jar called phoenix-core-4.3.1.jar (despite it being in the HBase package namespace, I guess they need it to integrate with HBase).

There are numerous questions on SO about ClassNotFoundExceptions on Spark and I've tried the fat-jar approach (both with Maven's assembly and shade plugins; I've inspected the jars, they do contain ClientRpcControllerFactory), and I've tried a lean jar while specifying the jars on the command line. For the latter, the command I used is as follows:

/opt/mapr/spark/spark-1.3.1/bin/spark-submit --jars lib/spark-streaming-kafka_10-1.3.1.jar,lib/kafka_2.10-0.8.1.1.jar,lib/zkclient-0.3.jar,lib/metrics-core-3.1.0.jar,lib/metrics-core-2.2.0.jar,lib/phoenix-core-4.3.1.jar --class nl.work.kafkastreamconsumer.phoenix.KafkaPhoenixConnector KafkaStreamConsumer.jar node1:5181 0 topic jdbc:phoenix:node1:5181 true

I've also done a classpath dump from within the code and the first classloader in the hierarchy already knows the Phoenix jar:

2015-06-04 10:52:34,323 [Executor task launch worker-1] INFO  nl.work.kafkastreamconsumer.phoenix.LinePersister - [file:/home/work/projects/customer/KafkaStreamConsumer.jar, file:/home/work/projects/customer/lib/spark-streaming-kafka_2.10-1.3.1.jar, file:/home/work/projects/customer/lib/kafka_2.10-0.8.1.1.jar, file:/home/work/projects/customer/lib/zkclient-0.3.jar, file:/home/work/projects/customer/lib/metrics-core-3.1.0.jar, file:/home/work/projects/customer/lib/metrics-core-2.2.0.jar, file:/home/work/projects/customer/lib/phoenix-core-4.3.1.jar]

So the question is: What am I missing here? Why can't Spark load the correct class? There should be only one version of the class flying around (namely the one from phoenix-core), so I doubt it's a versioning conflict.

[Executor task launch worker-3] ERROR nl.work.kafkastreamconsumer.phoenix.LinePersister - Error while processing line
java.lang.RuntimeException: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
        at nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnection.java:41)
        at nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.java:40)
        at nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.java:32)
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:999)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
        at org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:362)
        at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:133)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:282)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:166)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQueryServicesImpl.java:1831)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQueryServicesImpl.java:1810)
        at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1810)
        at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162)
        at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:126)
        at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
        at java.sql.DriverManager.getConnection(DriverManager.java:571)
        at java.sql.DriverManager.getConnection(DriverManager.java:233)
        at nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnection.java:39)
        ... 25 more
Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException
        at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457)
        at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350)
        at org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:280)
        ... 36 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.GeneratedConstructorAccessor8.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:455)
        ... 39 more
Caused by: java.lang.UnsupportedOperationException: Unable to find org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
        at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:36)
        at org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerFactory.java:56)
        at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:769)
        at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:689)
        ... 43 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:191)
        at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:32)
        ... 46 more

/edit

Unfortunately the issue remains with 4.4.0-HBase-0.98. Below are the classes in question. Since the saveToPhoenix() method is not yet available for the Java API and since this is just a POC, my idea was to simply use the JDBC driver for each mini-batch.

public class PhoenixConnection implements AutoCloseable, Serializable {
    private static final long serialVersionUID = -4491057264383873689L;
    private static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";

    static {
        try {
            Class.forName(PHOENIX_DRIVER);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    private Connection connection;

    public PhoenixConnection(final String jdbcUri) {

        try {
            connection = DriverManager.getConnection(jdbcUri);
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    public List<Map<String, Object>> executeQuery(final String sql) throws SQLException {

        ArrayList<Map<String, Object>> resultList = new ArrayList<>();
        try (PreparedStatement statement = connection.prepareStatement(sql); ResultSet resultSet = statement.executeQuery() ) {
            ResultSetMetaData metaData = resultSet.getMetaData();
            while (resultSet.next()) {
                Map<String, Object> row = new HashMap<>(metaData.getColumnCount());
                for (int column = 0; column < metaData.getColumnCount(); ++column) {
                    final String columnLabel = metaData.getColumnLabel(column);
                    row.put(columnLabel, resultSet.getObject(columnLabel));
                }
            }
        }
        resultList.trimToSize();

        return resultList;
    }

    @Override
    public void close() {
        try {
            connection.close();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

}

public class LinePersister implements Function<JavaRDD<String>, Void> {
    private static final long serialVersionUID = -2529724617108874989L;
    private static final Logger LOGGER = Logger.getLogger(LinePersister.class);
    private static final String TABLE_NAME = "mail_events";

    private final String jdbcUrl;

    public LinePersister(String jdbcUrl) {
        this.jdbcUrl = jdbcUrl;
    }



    @Override
    public Void call(JavaRDD<String> dataSet) throws Exception {
        LOGGER.info(String.format(
                "Starting conversion on rdd with %d elements", dataSet.count()));

        List<Void> collectResult = dataSet.map(new Function<String, Void>() {

            private static final long serialVersionUID = -6651313541439109868L;

            @Override
            public Void call(String line) throws Exception {
                LOGGER.info("Writing line " + line);
                Event event = EventParser.parseLine(line);
                try (PhoenixConnection connection = new PhoenixConnection(
                        jdbcUrl)) {
                    connection.executeQuery(event
                            .createUpsertStatement(TABLE_NAME));
                } catch (Exception e) {
                    LOGGER.error("Error while processing line", e);
                    dumpClasspath(this.getClass().getClassLoader());

                }
                return null;
            }
        }).collect();

        LOGGER.info(String.format("Got %d results: ", collectResult.size()));

        return null;
    }

    public static void dumpClasspath(ClassLoader loader)
    {
        LOGGER.info("Classloader " + loader + ":");

        if (loader instanceof URLClassLoader)
        {
            URLClassLoader ucl = (URLClassLoader)loader;
            LOGGER.info(Arrays.toString(ucl.getURLs()));
        }
        else
            LOGGER.error("cannot display components as not a URLClassLoader)");

        if (loader.getParent() != null)
            dumpClasspath(loader.getParent());
    }
}

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>nl.work</groupId>
    <artifactId>KafkaStreamConsumer</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <spark.version>1.3.1</spark.version>
        <hibernate.version>4.3.10.Final</hibernate.version>
        <phoenix.version>4.4.0-HBase-0.98</phoenix.version>
        <hbase.version>0.98.9-hadoop2</hbase.version>
        <spark-hbase.version>0.0.2-clabs-spark-1.3.1</spark-hbase.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-core</artifactId>
            <version>${phoenix.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-spark</artifactId>
            <version>${phoenix.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.cloudera</groupId>
            <artifactId>spark-hbase</artifactId>
            <version>${spark-hbase.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                </configuration>
            </plugin>
            <!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> 
                <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> 
                <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> 
                <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> 
                <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> 
                </execution> </executions> </plugin> -->
        </plugins>
    </build>
    <repositories>
        <repository>
            <id>unknown-jars-temp-repo</id>
            <name>A temporary repository created by NetBeans for libraries and jars it could not identify. Please replace the dependencies in this repository with correct ones and delete this repository.</name>
            <url>file:${project.basedir}/lib</url>
        </repository>
    </repositories>
</project>

/edit2 I've tried the saveAsHadoopApiFile approach (https://gist.github.com/mravi/444afe7f49821819c987#file-phoenixsparkjob-java) but that yields the same error, just a different stacktrace:

java.lang.RuntimeException: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
        at org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:58)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:995)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
        at org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:386)
        at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:145)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:288)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:171)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1881)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1860)
        at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1860)
        at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162)
        at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:131)
        at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
        at java.sql.DriverManager.getConnection(DriverManager.java:571)
        at java.sql.DriverManager.getConnection(DriverManager.java:187)
        at org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:92)
        at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:80)
        at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:68)
        at org.apache.phoenix.mapreduce.PhoenixRecordWriter.<init>(PhoenixRecordWriter.java:49)
        at org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:55)
        ... 8 more
Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException
        at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457)
        at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350)
        at org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:286)
        ... 23 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:455)
        ... 26 more
Caused by: java.lang.UnsupportedOperationException: Unable to find org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
        at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:36)
        at org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerFactory.java:56)
        at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:769)
        at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:689)
        ... 31 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:191)
        at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:32)
        ... 34 more

回答1:

The nice people at the Phoenix mailinglist gave me the answer:

"Rather than bundle the Phoenix client JAR with your app, are you able to include it in a static location either in the SPARK_CLASSPATH, or set the conf values below (I use SPARK_CLASSPATH myself, though it's deprecated): spark.driver.extraClassPath spark.executor.extraClassPath "

https://www.mail-archive.com/user@spark.apache.org/msg29978.html



回答2:

You cannot use Spark on 4.3.1 version of Apache Phoenix. The support for Spark exists only from version 4.4.0. The Jars are now added to Maven repo(source released on 29th May.

So if you update to 4.4.0 version of Phoenix you should be able to access. If you are using CDH, the client jar is not released yet.

Thanks

PS: I have raised JIRA for the same and the response is from one of the committers of Phoenix



回答3:

I had have same problem. The reason of problem,Phoenix use a custom rpc controller factory which is a Phoenix-specific one to configure the priorities for index and system catalog table in cluster side. It is called ClientRpcControllerFactory.

In sometimes Phoenix-enabled clusters are used from pure-HBase client applications resulting in ClassNotFoundExceptions in application code or MapReduce jobs. Since hbase configuration is shared between Phoenix-clients and HBase clients, having different configurations at the client side is hard. That's why you get this exception. This problem is fixed by HBASE-14960. If you hbase version older than 2.0.0, 1.2.0, 1.3.0, 0.98.17 You can define your client side rpc controller with this setting in hbase-site.xml:

  <property>
    <name>hbase.rpc.controllerfactory.class</name>
    <value>org.apache.hadoop.hbase.ipc.RpcControllerFactory</value>  
  </property>