How to populate a postgresql database with Mrjob a

2019-08-06 07:11发布

问题:

I would like to populate a database of Postgresql by using a mapper with MrJob and Hadoop 2.7.1. I currently using the following code:

# -*- coding: utf-8 -*-
#Script for storing the sparse data into a database by using Hadoop
import psycopg2
import re
from mrjob.job import MRJob

args_d = False
args_c = True
args_s = True
args_n = 'es_word_space'


def unicodize(segment):
    if re.match(r'\\u[0-9a-f]{4}', segment):
        return segment.decode('unicode-escape')
    return segment.decode('utf-8')

def create_tables(cr):
    cr.execute("create table word_list(id serial primary key, word character varying not null)")
    cr.execute("""create table word_sparse(
        id serial primary key, 
        word_id integer references word_list(id) not null,
        pos integer not null,
        val float not null)""")

def delete_tables(cr):
    cr.execute("drop table word_sparse")
    cr.execute("drop table word_list")

class MRwordStore(MRJob):
    def mapper(self, _, line):
        global cr

        item = line.strip().split('\t')
        replaced = u"".join((unicodize(seg) for seg in re.split(r'(\\u[0-9a-f]{4})', item[0])))
        key = u''.join((c for c in replaced if c != '"'))

        cr.execute("insert into word_list(word) values(%s) returning id", (key,))
        word_id = cr.fetchone()[0]

            #Parse the list, literal_eval is avoided because of memory issues
        inside = False
        number = ""
        pos = 0
        val = 0
        for c in item[1]:
            if c == '[':
                inside = True
            elif c.isdigit():
                number += c
            elif c == ',':
                if inside:
                    pos = int(number)
                    number = ""
            elif c == ']':
                if inside:
                    val = int(number)
                    number = ""
                    cr.execute("insert into word_sparse(word_id, pos, val) values (%s, %s, %s)", (word_id, pos, val))
                inside = False

if __name__ == "__main__":
    """
    Stores words in the database.

    The first time, run with the arguments -cs.
    If the database has to be recreated, run again with the d argument (-dcs)

    It also asumes the owner of the database is a user named semeval with password semeval
    """
    global cr

    conn = psycopg2.connect("dbname=%s user=semeval password=semeval" % args_n)
    cr = conn.cursor()
    if args_d:
        delete_tables(cr)
    if args_c:
        create_tables(cr)
    if args_s:
        MRwordStore().run()

    conn.commit()
    conn.close()

I tried to use not reducer. By calling my script I have this output:

$ python db_store_hadoop.py -r hadoop /almac/ignacio/data/wdSp_sparse.txt
no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /tmp/db_store_hadoop.hduser.20160113.012419.718376
writing wrapper script to /tmp/db_store_hadoop.hduser.20160113.012419.718376/setup-wrapper.sh
Using Hadoop version 2.7.1
Copying local files into hdfs:///user/hduser/tmp/mrjob/db_store_hadoop.hduser.20160113.012419.718376/files/

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

and there is not more, it seems to be hanged. Here is a sample of my input file:

"\u00e1gil" [[1572, 1], [1590, 1], [4, 1], [774, 1]]
"\u00e1guila"   [[10, 5], [1116, 2], [15, 1], [1590, 1], [1641, 2], [1704, 1], [1740, 3], [183, 1], [3, 1], [428, 2], [900, 3]]
"\u00e1guilas"  [[1043, 1], [248, 1], [618, 1], [701, 2], [862, 2], [864, 2]]
"\u00e1lava"    [[1572, 1], [1576, 2], [1590, 1], [726, 2]]

which is 1.5gB length. I already created the database and it is empty. Thank you very much for your help because I think probably there are many misconceptions.

回答1:

Each mapper needs its own database connection. Create the database connection in mapper_init() and close it in mapper_final(). You need to create the database separately from the mrjob script. You should try some very simple mrjob scripts first. You haven't started it the correct way. Work through the examples in the documentation.