Spark with Flume (configuration/classpath?)

2019-09-07 04:08发布

I am trying to get Spark working with Flume, flume config below:

#Declare
log.sources = src
log.sinks = spark
log.channels = chs

#Define Source

log.sources.src.type = exec
log.sources.src.command = sh /home/user/shell/flume.sh

#Define Sink
log.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
log.sinks.spark.hostname = localhost
log.sinks.spark.port = 9999
log.sinks.spark.channel = chs

#Define Channels

log.channels.chs.type = memory

#Tie Source and Sink to Channel

log.sinks.snk.channel = chs
log.sources.src.channels = chs

$ ls -lrt $FLUME_CLASSPATH

-rw-r--r-- 1 root root 7126372 Mar 18 2014 scala-library-2.10.4.jar

-rw-r--r-- 1 root root 412739 Apr 6 2014 commons-lang3-3.3.2.jar

-rw-r--r-- 1 root root 86020 Sep 24 00:15 spark-streaming-flume-sink_2.10-1.5.1.jar

-rw-r--r-- 1 root root 7126003 Nov 7 19:09 scala-library-2.10.3.jar

-rw-r--r-- 1 root root 82325 Nov 7 19:26 spark-streaming-flume-sink_2.11-1.2.0.jar

$flume-ng agent -f simplelogger.conf -n log

15/11/07 19:48:05 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
15/11/07 19:48:05 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:simplelogger.conf
15/11/07 19:48:05 INFO conf.FlumeConfiguration: Processing:spark
15/11/07 19:48:05 INFO conf.FlumeConfiguration: Processing:spark
15/11/07 19:48:05 INFO conf.FlumeConfiguration: Processing:spark
15/11/07 19:48:05 INFO conf.FlumeConfiguration: Processing:snk
15/11/07 19:48:05 INFO conf.FlumeConfiguration: Processing:spark
15/11/07 19:48:05 INFO conf.FlumeConfiguration: Added sinks: spark Agent: log
15/11/07 19:48:05 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [log]
15/11/07 19:48:05 INFO node.AbstractConfigurationProvider: Creating channels
15/11/07 19:48:05 INFO channel.DefaultChannelFactory: Creating instance of channel chs type memory
15/11/07 19:48:05 INFO node.AbstractConfigurationProvider: Created channel chs
15/11/07 19:48:05 INFO source.DefaultSourceFactory: Creating instance of source src, type exec
15/11/07 19:48:05 INFO sink.DefaultSinkFactory: Creating instance of sink: spark, type: org.apache.spark.streaming.flume.sink.SparkSink
15/11/07 19:48:05 ERROR node.PollingPropertiesFileConfigurationProvider: Failed to start agent because dependencies were not found in classpath. Error follows.
java.lang.NoClassDefFoundError: scala/Function1
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:190)
        at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:67)
        at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:415)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.ClassNotFoundException: scala.Function1
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        ... 14 more

Also have a plugins.d folder in the pwd (where I have the flume conf)

plugins.d/:

plugins.d/spark:

plugins.d/spark/lib:

-rw-r--r-- 1 rgopalk rgopalk 82325 Nov 7 19:31 spark-streaming-flume-sink_2.11-1.2.0.jar

Any pointers please?

PS: The multiple version of spark-streaming jar and scala-library jar in flume_classpath doesn't make any difference. The error is the same with single version

1条回答
SAY GOODBYE
2楼-- · 2019-09-07 04:18

I copied all the jar files listed above to {FLUME_INSTALLATTION_DIR/libs. I also copied {SPARK_HOME}/lib/spark-assembly to {FLUME_INSTALLATTION_DIR/libs and it started working

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/flume/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume/lib/spark-assembly-1.5.1-hadoop2.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/11/07 21:18:15 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
15/11/07 21:18:15 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:simplelogger.conf
15/11/07 21:18:15 INFO conf.FlumeConfiguration: Processing:spark
15/11/07 21:18:15 INFO conf.FlumeConfiguration: Processing:spark
15/11/07 21:18:15 INFO conf.FlumeConfiguration: Processing:spark
15/11/07 21:18:15 INFO conf.FlumeConfiguration: Processing:snk
15/11/07 21:18:15 INFO conf.FlumeConfiguration: Processing:spark
15/11/07 21:18:15 INFO conf.FlumeConfiguration: Added sinks: spark Agent: log
15/11/07 21:18:15 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [log]
15/11/07 21:18:15 INFO node.AbstractConfigurationProvider: Creating channels
15/11/07 21:18:15 INFO channel.DefaultChannelFactory: Creating instance of channel chs type memory
15/11/07 21:18:15 INFO node.AbstractConfigurationProvider: Created channel chs
15/11/07 21:18:15 INFO source.DefaultSourceFactory: Creating instance of source src, type exec
15/11/07 21:18:15 INFO sink.DefaultSinkFactory: Creating instance of sink: spark, type: org.apache.spark.streaming.flume.sink.SparkSink
15/11/07 21:18:15 INFO sink.SparkSink: Configured Spark Sink with hostname: localhost, port: 9999, poolSize: 10, transactionTimeout: 60, backoffInterval: 200
15/11/07 21:18:15 INFO node.AbstractConfigurationProvider: Channel chs connected to [src, spark]
15/11/07 21:18:15 INFO node.Application: Starting new configuration:{ sourceRunners:{src=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:src,state:IDLE} }} sinkRunners:{spark=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2b430201 counterGroup:{ name:null counters:{} } }} channels:{chs=org.apache.flume.channel.MemoryChannel{name: chs}} }
15/11/07 21:18:15 INFO node.Application: Starting Channel chs
15/11/07 21:18:15 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: CHANNEL, name: chs, registered successfully.
15/11/07 21:18:15 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: chs started
15/11/07 21:18:15 INFO node.Application: Starting Sink spark
15/11/07 21:18:15 INFO sink.SparkSink: Starting Spark Sink: spark on port: 9999 and interface: localhost with pool size: 10 and transaction timeout: 60.
15/11/07 21:18:15 INFO node.Application: Starting Source src
15/11/07 21:18:15 INFO source.ExecSource: Exec source starting with command:sh /home/rgopalk/shell/flume.sh
15/11/07 21:18:15 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: SOURCE, name: src, registered successfully.
15/11/07 21:18:15 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: src started
15/11/07 21:18:16 INFO sink.SparkSink: Starting Avro server for sink: spark
15/11/07 21:18:16 INFO sink.SparkSink: Blocking Sink Runner, sink will continue to run..

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service, privacy policy and cookie policy

Not the answer you're looking for? Browse other questions tagged or ask your own question.

登录 后发表回答
收藏的人(5)