I am doing twitter sentiment analysis using hadoop, flume and hive. I have created the table using
hive -f tweets.sql
tweets.sql
--create the tweets_raw table containing the records as received from Twitter
SET hive.support.sql11.reserved.keywords=false;
CREATE EXTERNAL TABLE Mytweets_raw (
id BIGINT,
created_at STRING,
source STRING,
favorited BOOLEAN,
retweet_count INT,
retweeted_status STRUCT<
text:STRING,
user:STRUCT<screen_name:STRING,name:STRING>>,
entities STRUCT<
urls:ARRAY<STRUCT<expanded_url:STRING>>,
user_mentions:ARRAY<STRUCT<screen_name:STRING,name:STRING>>,
hashtags:ARRAY<STRUCT<text:STRING>>>,
text STRING,
user STRUCT<
screen_name:STRING,
name:STRING,
friends_count:INT,
followers_count:INT,
statuses_count:INT,
verified:BOOLEAN,
utc_offset:INT,
time_zone:STRING>,
in_reply_to_screen_name STRING
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION '/user/flume/tweets';
-- create sentiment dictionary
CREATE EXTERNAL TABLE dictionary (
type string,
length int,
word string,
pos string,
stemmed string,
polarity string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data/dictionary';
-- loading data to the table dictionary
load data inpath 'data/dictionary/dictionary.tsv' INTO TABLE dictionary;
CREATE EXTERNAL TABLE time_zone_map (
time_zone string,
country string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data/time_zone_map';
-- loading data to the table time_zone_map
load data inpath 'data/time_zone_map/time_zone_map.tsv' INTO TABLE time_zone_map;
-- Clean up tweets
CREATE VIEW tweets_simple AS
SELECT
id,
cast ( from_unixtime( unix_timestamp(concat( '2014 ', substring(created_at,5,15)), 'yyyy MMM dd hh:mm:ss')) as timestamp) ts,
text,
user.time_zone
FROM Mytweets_raw
;
CREATE VIEW tweets_clean AS
SELECT
id,
ts,
text,
m.country
FROM tweets_simple t LEFT OUTER JOIN time_zone_map m ON t.time_zone = m.time_zone;
-- Compute sentiment
create view l1 as select id, words from Mytweets_raw lateral view explode(sentences(lower(text))) dummy as words;
create view l2 as select id, word from l1 lateral view explode( words ) dummy as word ;
create view l3 as select
id,
l2.word,
case d.polarity
when 'negative' then -1
when 'positive' then 1
else 0 end as polarity
from l2 left outer join dictionary d on l2.word = d.word;
create table tweets_sentiment as select
id,
case
when sum( polarity ) > 0 then 'positive'
when sum( polarity ) < 0 then 'negative'
else 'neutral' end as sentiment
from l3 group by id;
-- put everything back together and re-name sentiments...
CREATE TABLE tweetsbi
AS
SELECT
t.*,
s.sentiment
FROM tweets_clean t LEFT OUTER JOIN tweets_sentiment s on t.id = s.id;
-- data with tweet counts.....
CREATE TABLE tweetsbiaggr
AS
SELECT
country,sentiment, count(sentiment) as tweet_count
FROM tweetsbi
group by country,sentiment;
-- store data for analysis......
CREATE VIEW A as select country,tweet_count as positive_response from tweetsbiaggr where sentiment='positive';
CREATE VIEW B as select country,tweet_count as negative_response from tweetsbiaggr where sentiment='negative';
CREATE VIEW C as select country,tweet_count as neutral_response from tweetsbiaggr where sentiment='neutral';
CREATE TABLE tweetcompare as select A.*,B.negative_response as negative_response,C.neutral_response as neutral_response from A join B on A.country= B.country join C on B.country=C.country;
-- permission to show data in Excel sheet for analysis ....
grant SELECT ON TABLE tweetcompare to user hue;
grant SELECT ON TABLE tweetcompare to user root;
-- for Tableau or Excel
-- UDAF sentiscore = sum(sentiment)*50 / count(sentiment)
-- context n-gram made readable
While executing query
SELECT t.retweeted_screen_name, sum(retweets) AS total_retweets, count(*) AS tweet_count FROM (SELECT retweeted_status.user.screen_name as retweeted_screen_name, retweeted_status.text, max(retweet_count) as retweets FROM mytweets GROUP BY retweeted_status.user.screen_name, retweeted_status.text) t GROUP BY t.retweeted_screen_name ORDER BY total_retweets DESC LIMIT 10;
this error shows:
Query ID = root_20161114140028_852cb526-011f-4a25-95c8-8c6587a88759
Total jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
java.io.FileNotFoundException: File does not exist: hdfs://localhost:9000/tmp/e70ec3c9-14c7-41e9-ad11-2d4528057e47_resources/json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:288)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:224)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestamps(ClientDistributedCacheManager.java:93)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(ClientDistributedCacheManager.java:57)
at org.apache.hadoop.mapreduce.JobResourceUploader.uploadFiles(JobResourceUploader.java:179)
at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:98)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:193)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:562)
at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:557)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:557)
at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:548)
at org.apache.hadoop.hive.ql.exec.mr.ExecDriver.execute(ExecDriver.java:433)
at org.apache.hadoop.hive.ql.exec.mr.MapRedTask.execute(MapRedTask.java:138)
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:197)
at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:100)
at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1858)
at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1562)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1313)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1084)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1072)
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:232)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:183)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:399)
at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:776)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:714)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:641)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Job Submission failed with exception 'java.io.FileNotFoundException(File does not exist: hdfs://localhost:9000/tmp/e70ec3c9-14c7-41e9-ad11-2d4528057e47_resources/json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar)'
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. File does not exist: hdfs://localhost:9000/tmp/e70ec3c9-14c7-41e9-ad11-2d4528057e47_resources/json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar
hive-site.xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/usr/lib/warehouse</value>
</property>
<property>
<name>hive.metastore.local</name>
<value>true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:derby:;databaseName=/usr/lib/warehouse/metastore_db;create=true </value>
</property>
<property>
<name>hive.exec.reducers.bytes.per.reducer</name>
<value>256000000</value>
</property>
<property>
<name>hive.exec.reducers.max</name>
<value>1009</value>
</property>
</configuration>
mapred-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.job.reduces</name>
<value>1</value>
</property>
</configuration>
core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
/etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
However, I have added the jar file to hive, the same error shows :
ADD JAR file:///usr/lib/hive/lib/json-serde-1.3.8-SNAPSHOT-jar-with-dependencies.jar;
Please help me fix this.