JSON data sink to Apache Phoenix with Apache Flume

2019-07-24 18:49发布

I want to sink JSON data into Apache Phoenix with Apache Flume, followed an online guide http://kalyanbigdatatraining.blogspot.com/2016/10/how-to-stream-json-data-into-phoenix.html, but met the following error. How to resolve it? Many thanks!

My environment list as:

  • hadoop-2.7.3
  • hbase-1.3.1
  • phoenix-4.12.0-HBase-1.3-bin
  • flume-1.7.0

In flume, I added phoenix sink related jars in $FLUME_HOME/plugins.d/phoenix-sink/lib

commons-io-2.4.jar
twill-api-0.8.0.jar
twill-discovery-api-0.8.0.jar json-path-2.2.0.jar
twill-common-0.8.0.jar
twill-discovery-core-0.8.0.jar phoenix-flume-4.12.0-HBase-1.3.jar
twill-core-0.8.0.jar
twill-zookeeper-0.8.0.jar

2017-11-11 14:49:54,786 (lifecycleSupervisor-1-1) [DEBUG - org.apache.phoenix.jdbc.PhoenixDriver$2.onRemoval(PhoenixDriver.java:159)] Expiring localhost:2181:/hbase because of EXPLICIT 2017-11-11 14:49:54,787 (lifecycleSupervisor-1-1) [INFO - org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.closeZooKeeperWatcher(ConnectionManager.java:1712)] Closing zookeeper sessionid=0x15fa8952cea00a6 2017-11-11 14:49:54,787 (lifecycleSupervisor-1-1) [DEBUG - org.apache.zookeeper.ZooKeeper.close(ZooKeeper.java:673)] Closing session: 0x15fa8952cea00a6 2017-11-11 14:49:54,787 (lifecycleSupervisor-1-1) [DEBUG - org.apache.zookeeper.ClientCnxn.close(ClientCnxn.java:1306)] Closing client for session: 0x15fa8952cea00a6 2017-11-11 14:49:54,789 (lifecycleSupervisor-1-1-SendThread(localhost:2181)) [DEBUG - org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:818)] Reading reply sessionid:0x15fa8952cea00a6, packet:: clientPath:null serverPath:null finished:false header:: 3,-11 replyHeader:: 3,2620,0 request:: null response:: null 2017-11-11 14:49:54,789 (lifecycleSupervisor-1-1) [DEBUG - org.apache.zookeeper.ClientCnxn.disconnect(ClientCnxn.java:1290)] Disconnecting client for session: 0x15fa8952cea00a6 2017-11-11 14:49:54,789 (lifecycleSupervisor-1-1-SendThread(localhost:2181)) [DEBUG - org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1086)] An exception was thrown while closing send thread for session 0x15fa8952cea00a6 : Unable to read additional data from server sessionid 0x15fa8952cea00a6, likely server has closed socket 2017-11-11 14:49:54,789 (lifecycleSupervisor-1-1) [INFO - org.apache.zookeeper.ZooKeeper.close(ZooKeeper.java:684)] Session: 0x15fa8952cea00a6 closed 2017-11-11 14:49:54,789 (lifecycleSupervisor-1-1-EventThread) [INFO - org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:512)] EventThread shut down 2017-11-11 14:49:54,790 (lifecycleSupervisor-1-1) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2d2052a0 counterGroup:{ name:null counters:{} } } - Exception follows.

java.lang.NoSuchMethodError: org.apache.twill.zookeeper.ZKClientService.startAndWait()Lcom/google/common/util/concurrent/Service$State;

at org.apache.phoenix.transaction.TephraTransactionContext.setTransactionClient(TephraTransactionContext.java:147)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.initTxServiceClient(ConnectionQueryServicesImpl.java:401)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:415)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.access$500(ConnectionQueryServicesImpl.java:257)
at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:2384)
at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:2360)
at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:76)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:2360)
at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:255)
at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnection(PhoenixEmbeddedDriver.java:150)
at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:221)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at org.apache.phoenix.flume.serializer.BaseEventSerializer.initialize(BaseEventSerializer.java:140)
at org.apache.phoenix.flume.sink.PhoenixSink.start(PhoenixSink.java:119)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:45)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

2017-11-11 14:49:54,792 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:149)] Component type: SINK, name: Phoenix Sink__1 stopped 2017-11-11 14:49:54,792 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:155)] Shutdown Metric for type: SINK, name: Phoenix Sink__1. sink.start.time == 1510382993516

Here is my flume-agent.properties

agent.sources = exec
agent.channels = mem-channel
agent.sinks = phoenix-sink
agent.sources.exec.type = exec
agent.sources.exec.command = tail -F /Users/chenshuai1/tmp/users.json
agent.sources.exec.channels = mem-channel
agent.sinks.phoenix-sink.type = org.apache.phoenix.flume.sink.PhoenixSink
agent.sinks.phoenix-sink.batchSize = 10
agent.sinks.phoenix-sink.zookeeperQuorum = localhost
agent.sinks.phoenix-sink.table = users2
agent.sinks.phoenix-sink.ddl = CREATE TABLE IF NOT EXISTS users2 (userid BIGINT NOT NULL, username VARCHAR, password VARCHAR, email VARCHAR, country VARCHAR, state VARCHAR, city VARCHAR, dt VARCHAR NOT NULL CONSTRAINT PK PRIMARY KEY (userid, dt))
agent.sinks.phoenix-sink.serializer = json
agent.sinks.phoenix-sink.serializer.columnsMapping = {"userid":"userid", "username":"username", "password":"password", "email":"email", "country":"country", "state":"state", "city":"city", "dt":"dt"}
agent.sinks.phoenix-sink.serializer.partialSchema = true
agent.sinks.phoenix-sink.serializer.columns = userid,username,password,email,country,state,city,dt
agent.sinks.phoenix-sink.channel = mem-channel
agent.channels.mem-channel.type = memory
agent.channels.mem-channel.capacity = 1000
agent.channels.mem-channel.transactionCapacity = 100

0条回答
登录 后发表回答