I'm working with Python2.7, Celery and cx_Oracle to access the Oracle database.
I create a lot of tasks. Each task runs a query through cx_Oracle. Many of this tasks will run simultaneously. All tasks should share the same database connection.
If I only launch one task, the query gets run correctly. However, if I launch several queries, I start getting this error message:
[2016-04-04 17:12:43,846: ERROR/MainProcess] Task tasks.run_query[574a6e7f-f58e-4b74-bc84-af4555af97d6] raised unexpected: DatabaseError('<cx_Oracle._Error object at 0x7f9976635580>',)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "/home/ric/workspace/dbw_celery/tasks.py", line 39, in run_query
column_names = get_column_names(oracle_conn, table_info["table_name"])
File "/home/ric/workspace/dbw_celery/utilities.py", line 87, in get_column_names
cursor.execute(query_str)
DatabaseError: <cx_Oracle._Error object at 0x7f9976635580>
Let's look at my code now.
This is my tasks.py
file, where I create the Oracle database connection, Celery instance and define my tasks which will user said database connection:
# tasks.py
import celeryconfig
from celery import Celery
from utilities import connect_to_db, get_new_rows, write_output_rows
# Define a Celery instance
dbwapp = Celery('tasks')
dbwapp.config_from_object(celeryconfig)
dbwapp.conf["CELERYBEAT_SCHEDULE"] = {}
# Define an Oracle connection as a global variable to be used by all tasks
oracle_conn = connect_to_db(db_user, db_pass, db_host, db_port, db_name)
# Define the task function that each Celery worker will run
@dbwapp.task()
def run_query(table_info, output_description):
"""Run a query on a given table. Writes found rows to output file."""
global oracle_conn
column_names = get_column_names(oracle_conn, table_info["table_name"])
new_rows, last_check_timestamp = get_new_rows(oracle_conn, table_info)
write_result_to_output_file(output_file, new_rows)
def load_celerybeat_schedule():
"""Loads the CELERYBEAT_SCHEDULE dictionary with the tasks to run."""
new_task_dict = {
"task": "tasks.run_query",
"schedule": timedelta(seconds=table_config["check_interval"]),
"args": (table_config, output_description)
}
new_task_name = "task-" + table_config["table_name"]
dbwapp.conf["CELERYBEAT_SCHEDULE"][new_task_name] = new_task_dict
This is how I connect to the database in the utilities.py
file:
# utilities.py
def connect_to_db(db_user, db_password, db_host, db_port, db_name):
"""Connect to DB."""
connection_str = "%s/%s@%s:%s/%s" % (db_user, db_password, db_host, db_port, db_name)
try:
db_connection = cx_Oracle.connect(connection_str)
except cx_Oracle.DatabaseError:
logger.error("Couldn't connect to DB %s" % db_name)
return None
logging.info("Succesfully connected to the DB: %s" % db_name)
return db_connection
This is the get_new_rows_function
defined in another file, where the query gets actually run:
#utilities.py
def get_new_rows(db_connection, table_info):
"""Return new rows inserted in a given table since last check."""
cursor = db_connection.cursor()
query_str = "SELECT * FROM {0}".format(table_info["table_name"])
cursor.execute(query_str)
new_rows = cursor.fetchall()
cursor.close()
return new_rows
I run my code like this: celery -A tasks worker -B
I have tried to simplify my code in order to make it easier to understand.
I am afraid that the error I'm getting is caused by different tasks running simultaneosuly and sharing the same database connection. Their simultaneous execution gets "mixed up" or something like that.
What is the correct way to share a database connection between different Celery tasks?
Does anybody know what I'm doing wrong?
If you wish to have multiple threads sharing the same connection you need to enable threaded mode. Something like this:
If you don't you can run into some interesting problems!