I would like to populate a database of Postgresql by using a mapper with MrJob and Hadoop 2.7.1. I currently using the following code:
# -*- coding: utf-8 -*-
#Script for storing the sparse data into a database by using Hadoop
import psycopg2
import re
from mrjob.job import MRJob
args_d = False
args_c = True
args_s = True
args_n = 'es_word_space'
def unicodize(segment):
if re.match(r'\\u[0-9a-f]{4}', segment):
return segment.decode('unicode-escape')
return segment.decode('utf-8')
def create_tables(cr):
cr.execute("create table word_list(id serial primary key, word character varying not null)")
cr.execute("""create table word_sparse(
id serial primary key,
word_id integer references word_list(id) not null,
pos integer not null,
val float not null)""")
def delete_tables(cr):
cr.execute("drop table word_sparse")
cr.execute("drop table word_list")
class MRwordStore(MRJob):
def mapper(self, _, line):
global cr
item = line.strip().split('\t')
replaced = u"".join((unicodize(seg) for seg in re.split(r'(\\u[0-9a-f]{4})', item[0])))
key = u''.join((c for c in replaced if c != '"'))
cr.execute("insert into word_list(word) values(%s) returning id", (key,))
word_id = cr.fetchone()[0]
#Parse the list, literal_eval is avoided because of memory issues
inside = False
number = ""
pos = 0
val = 0
for c in item[1]:
if c == '[':
inside = True
elif c.isdigit():
number += c
elif c == ',':
if inside:
pos = int(number)
number = ""
elif c == ']':
if inside:
val = int(number)
number = ""
cr.execute("insert into word_sparse(word_id, pos, val) values (%s, %s, %s)", (word_id, pos, val))
inside = False
if __name__ == "__main__":
"""
Stores words in the database.
The first time, run with the arguments -cs.
If the database has to be recreated, run again with the d argument (-dcs)
It also asumes the owner of the database is a user named semeval with password semeval
"""
global cr
conn = psycopg2.connect("dbname=%s user=semeval password=semeval" % args_n)
cr = conn.cursor()
if args_d:
delete_tables(cr)
if args_c:
create_tables(cr)
if args_s:
MRwordStore().run()
conn.commit()
conn.close()
I tried to use not reducer. By calling my script I have this output:
$ python db_store_hadoop.py -r hadoop /almac/ignacio/data/wdSp_sparse.txt
no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /tmp/db_store_hadoop.hduser.20160113.012419.718376
writing wrapper script to /tmp/db_store_hadoop.hduser.20160113.012419.718376/setup-wrapper.sh
Using Hadoop version 2.7.1
Copying local files into hdfs:///user/hduser/tmp/mrjob/db_store_hadoop.hduser.20160113.012419.718376/files/
PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols
and there is not more, it seems to be hanged. Here is a sample of my input file:
"\u00e1gil" [[1572, 1], [1590, 1], [4, 1], [774, 1]]
"\u00e1guila" [[10, 5], [1116, 2], [15, 1], [1590, 1], [1641, 2], [1704, 1], [1740, 3], [183, 1], [3, 1], [428, 2], [900, 3]]
"\u00e1guilas" [[1043, 1], [248, 1], [618, 1], [701, 2], [862, 2], [864, 2]]
"\u00e1lava" [[1572, 1], [1576, 2], [1590, 1], [726, 2]]
which is 1.5gB length. I already created the database and it is empty. Thank you very much for your help because I think probably there are many misconceptions.