Flask: passing around background worker job (rq, r

2019-03-16 05:43发布

问题:

I want to do a very simple thing: Launch a worker to something and then return the answer to user. I'm trying to do so using a combination of Flask and RQ.

import os
from flask import Flask, session
from somewhere import do_something
from rq import Queue
from worker import conn

app = Flask(__name__)
app.debug = True
app.secret_key = '....'

q = Queue(connection=conn)

@app.route('/make/')
def make():
    job = q.enqueue(do_something, 'argument')
    session['job'] = job
    return 'Done'

@app.route('/get/')
def get():
    try:
        session['job'].refresh()
        out = str(session['job'].result)
    except:
        out = 'No result yet'
    return out

The idea in this very simple example is that people go to /make/ and the job starts. After some time there can go to /get/ and the result from the worker will be printed there.

However one line is causing problems:

session['job'] = job

It seems the job cannot be pickled, which is apparently used by the Flaks session. I'm getting the error:

...
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/flask/app.py", line 804, in save_session
10:52:16 web.1     |     return self.session_interface.save_session(self, session, response)
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/flask/sessions.py", line 205, in save_session
10:52:16 web.1     |     secure=secure, domain=domain)
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/werkzeug/contrib/securecookie.py", line 329, in save_cookie
10:52:16 web.1     |     data = self.serialize(session_expires or expires)
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/werkzeug/contrib/securecookie.py", line 235, in serialize
10:52:16 web.1     |     self.quote(value)
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/werkzeug/contrib/securecookie.py", line 192, in quote
10:52:16 web.1     |     value = cls.serialization_method.dumps(value)
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
10:52:16 web.1     |     raise TypeError, "can't pickle %s objects" % base.__name__
10:52:16 web.1     | TypeError: can't pickle function objects

I really hope something can help. I might be doing this in a completely wrong way (with passing the job via a session), but I have no idea how else to access the result of the job...

Any help will be highly appreciated.

Thanks in advance.

回答1:

I've not used rq before but I see that a job has a .key property. It might be easier to store that hash in your session. Then you can use the Job class's .fetch method which will itself call a .refresh() and return the job to you. Reading the .result() at that point would give you the job's current status.

Maybe like this (untested):

from rq.job import Job

@app.route('/make/')
def make():
    job = q.enqueue(do_something, 'argument')
    session['job'] = job.key
    return 'Done'

@app.route('/get/')
def get():
    try:
        job = Job()
        job.fetch(session['job'])
        out = str(job.result)
    except:
        out = 'No result yet'
    return out


回答2:

Problem with serializing arguments (you actually trying to serialize function object which is impossible with pickle).

Try

@app.route('/make/')
def make():
    job = q.enqueue(func=do_something, args=('argument',))
    session['job'] = job
    return 'Done'