I want to sink JSON data into Apache Phoenix with Apache Flume, followed an online guide http://kalyanbigdatatraining.blogspot.com/2016/10/how-to-stream-json-data-into-phoenix.html, but met the following error. How to resolve it? Many thanks!
My environment list as:
- hadoop-2.7.3
- hbase-1.3.1
- phoenix-4.12.0-HBase-1.3-bin
- flume-1.7.0
In flume, I added phoenix sink related jars in $FLUME_HOME/plugins.d/phoenix-sink/lib
commons-io-2.4.jar
twill-api-0.8.0.jar
twill-discovery-api-0.8.0.jar
json-path-2.2.0.jar
twill-common-0.8.0.jar
twill-discovery-core-0.8.0.jar
phoenix-flume-4.12.0-HBase-1.3.jar
twill-core-0.8.0.jar
twill-zookeeper-0.8.0.jar
2017-11-11 14:49:54,786 (lifecycleSupervisor-1-1) [DEBUG - org.apache.phoenix.jdbc.PhoenixDriver$2.onRemoval(PhoenixDriver.java:159)] Expiring localhost:2181:/hbase because of EXPLICIT 2017-11-11 14:49:54,787 (lifecycleSupervisor-1-1) [INFO - org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.closeZooKeeperWatcher(ConnectionManager.java:1712)] Closing zookeeper sessionid=0x15fa8952cea00a6 2017-11-11 14:49:54,787 (lifecycleSupervisor-1-1) [DEBUG - org.apache.zookeeper.ZooKeeper.close(ZooKeeper.java:673)] Closing session: 0x15fa8952cea00a6 2017-11-11 14:49:54,787 (lifecycleSupervisor-1-1) [DEBUG - org.apache.zookeeper.ClientCnxn.close(ClientCnxn.java:1306)] Closing client for session: 0x15fa8952cea00a6 2017-11-11 14:49:54,789 (lifecycleSupervisor-1-1-SendThread(localhost:2181)) [DEBUG - org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:818)] Reading reply sessionid:0x15fa8952cea00a6, packet:: clientPath:null serverPath:null finished:false header:: 3,-11 replyHeader:: 3,2620,0 request:: null response:: null 2017-11-11 14:49:54,789 (lifecycleSupervisor-1-1) [DEBUG - org.apache.zookeeper.ClientCnxn.disconnect(ClientCnxn.java:1290)] Disconnecting client for session: 0x15fa8952cea00a6 2017-11-11 14:49:54,789 (lifecycleSupervisor-1-1-SendThread(localhost:2181)) [DEBUG - org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1086)] An exception was thrown while closing send thread for session 0x15fa8952cea00a6 : Unable to read additional data from server sessionid 0x15fa8952cea00a6, likely server has closed socket 2017-11-11 14:49:54,789 (lifecycleSupervisor-1-1) [INFO - org.apache.zookeeper.ZooKeeper.close(ZooKeeper.java:684)] Session: 0x15fa8952cea00a6 closed 2017-11-11 14:49:54,789 (lifecycleSupervisor-1-1-EventThread) [INFO - org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:512)] EventThread shut down 2017-11-11 14:49:54,790 (lifecycleSupervisor-1-1) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2d2052a0 counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.apache.twill.zookeeper.ZKClientService.startAndWait()Lcom/google/common/util/concurrent/Service$State;
at org.apache.phoenix.transaction.TephraTransactionContext.setTransactionClient(TephraTransactionContext.java:147)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.initTxServiceClient(ConnectionQueryServicesImpl.java:401)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:415)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.access$500(ConnectionQueryServicesImpl.java:257)
at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:2384)
at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:2360)
at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:76)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:2360)
at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:255)
at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnection(PhoenixEmbeddedDriver.java:150)
at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:221)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at org.apache.phoenix.flume.serializer.BaseEventSerializer.initialize(BaseEventSerializer.java:140)
at org.apache.phoenix.flume.sink.PhoenixSink.start(PhoenixSink.java:119)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:45)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2017-11-11 14:49:54,792 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:149)] Component type: SINK, name: Phoenix Sink__1 stopped 2017-11-11 14:49:54,792 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:155)] Shutdown Metric for type: SINK, name: Phoenix Sink__1. sink.start.time == 1510382993516
Here is my flume-agent.properties
agent.sources = exec
agent.channels = mem-channel
agent.sinks = phoenix-sink
agent.sources.exec.type = exec
agent.sources.exec.command = tail -F /Users/chenshuai1/tmp/users.json
agent.sources.exec.channels = mem-channel
agent.sinks.phoenix-sink.type = org.apache.phoenix.flume.sink.PhoenixSink
agent.sinks.phoenix-sink.batchSize = 10
agent.sinks.phoenix-sink.zookeeperQuorum = localhost
agent.sinks.phoenix-sink.table = users2
agent.sinks.phoenix-sink.ddl = CREATE TABLE IF NOT EXISTS users2 (userid BIGINT NOT NULL, username VARCHAR, password VARCHAR, email VARCHAR, country VARCHAR, state VARCHAR, city VARCHAR, dt VARCHAR NOT NULL CONSTRAINT PK PRIMARY KEY (userid, dt))
agent.sinks.phoenix-sink.serializer = json
agent.sinks.phoenix-sink.serializer.columnsMapping = {"userid":"userid", "username":"username", "password":"password", "email":"email", "country":"country", "state":"state", "city":"city", "dt":"dt"}
agent.sinks.phoenix-sink.serializer.partialSchema = true
agent.sinks.phoenix-sink.serializer.columns = userid,username,password,email,country,state,city,dt
agent.sinks.phoenix-sink.channel = mem-channel
agent.channels.mem-channel.type = memory
agent.channels.mem-channel.capacity = 1000
agent.channels.mem-channel.transactionCapacity = 100