Flume - TwitterSource language filter

2019-08-03 01:35发布

问题:

I would like to ask your help in the following case.

I'm currently using Cloudera CDH 5.1.2 and I tried to collect Twitter data using Flume as it is described in the following porsts (Cloudera):

  • http://blog.cloudera.com/blog/2012/10/analyzing-twitter-data-with-hadoop-part-2-gathering-data-with-flume/
  • github.com/cloudera/cdh-twitter-example

I downloaded the source and rebuilt the flume-sources after updating the versions in pom.xml:

<flume.version>1.5.0-cdh5.1.2</flume.version>
<hadoop.version>2.3.0-cdh5.1.2</hadoop.version>

It worked perfectly.

After that I wanted to add a "language" filter, to capture only the tweets of a specific language. For this, I modified the TwitterSource.java to call the FilterQuery.language method somehow like this:

FilterQuery query = new FilterQuery();
...
if (languages.length != 0) {
query.language(languages);
}

I'm trying to use twitter4j-stream version 3.0.6. I updated it in pom.xml:

<!-- For the Twitter API -->
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>3.0.6</version>
</dependency>

With these settings I rebuilt the jar (mvn package).

When I start my agent, I get the following exception (NoSuchMethodError):

Unable to start EventDrivenSourceRunner: { source:com.cloudera.flume.source.TwitterSource{name:Twitter,state:IDLE} } - Exception follows. java.lang.NoSuchMethodError: twitter4j.FilterQuery.language([Ljava/lang/String;)Ltwitter4j/FilterQuery; at com.cloudera.flume.source.TwitterSource.start(TwitterSource.java:165) at org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44) at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

I checked, and this version of twitter4j-stream contains the language method:

  • github.com/yusuke/twitter4j/blob/3.0.6/twitter4j-stream/src/main/java/twitter4j/FilterQuery.java

What am I doing wrong?

Thanks in advance,

Peter

回答1:

Finally I managed to solve this problem. So here's the solution to anyone out there facing the same issue.

First (in the above case in the original post) I placed my generated jar to /var/lib/flume-ng/plugins.d/twitter-streaming/lib/, and set it up in the Cloudera Manager config to use this location.

In this case the CM placed this directory to the and of the classpath in the runner file (after the parcel directory). So the directory order in the classpath looked like this:

  • /opt/cloudera/parcels/CDH-5.1.2-1.cdh5.1.2.p0.3/lib/flume-ng/lib/*

  • /var/lib/flume-ng/plugins.d/twitter-streaming/lib/*

Unfortunately there was a twitter4j-stream-3.0.3.jar and twitter4j-core-3.0.3.jar in the parcel directory, and flume tried to use that instead of 3.0.6, and in that version FilterQuery.language obviously doesn't exist.

So I just deleted those jars from the parcel directory, and it works fine now.



回答2:

I tried this with cdh3 and it worked fine with me. One of the thing i noticed was system time should be set to current time. In your case, I think it is looking Language method in FilterQuery class.