python MySQLDB query timeout

2019-02-01 02:44发布

问题:

I'm trying to enforce a time limit on queries in python MySQLDB. I have a situation where I have no control over the queries, but need to ensure that they do not run over a set time limit. I've tried using signal.SIGALRM to interrupt the call to execute, but this does not seem to work. The signal gets sent, but does not get caught until after the call to execute finishes.

I wrote a test case to prove this behavior:

#!/usr/local/bin/python2.6

import time
import signal

from somewhere import get_dbc

class Timeout(Exception):
    """ Time Exceded """

def _alarm_handler(*args):
    raise Timeout

dbc = get_dbc()

signal.signal(signal.SIGALRM, _alarm_handler)
signal.alarm(1)

try:
    print "START:  ", time.time()
    dbc.execute("SELECT SLEEP(10)")
except Timeout:
    print "TIMEOUT!", time.time()'

The "SELECT SLEEP(10)" is simulating a slow query, but I do see the same behavior with an actual slow query.

The Result:

START:   1254440686.69
TIMEOUT! 1254440696.69

As you can see, it's sleeping for 10 seconds then I get the Timeout Exception.

Questions:

  1. Why do I not get the signal until after execute finishes?
  2. Is there another reliable way to limit query execution time?

回答1:

@nosklo's twisted-based solution is elegant and workable, but if you want to avoid the dependency on twisted, the task is still doable, e.g:

import multiprocessing

def query_with_timeout(dbc, timeout, query, *a, **k):
  conn1, conn2 = multiprocessing.Pipe(False)
  subproc = multiprocessing.Process(target=do_query,
                                    args=(dbc, query, conn2)+a, 
                                    kwargs=k)
  subproc.start()
  subproc.join(timeout)
  if conn1.poll():
    return conn1.recv()
  subproc.terminate()
  raise TimeoutError("Query %r ran for >%r" % (query, timeout))

def do_query(dbc, query, conn, *a, **k):
  cu = dbc.cursor()
  cu.execute(query, *a, **k)
  return cu.fetchall()


回答2:

I've tried using signal.SIGALRM to interrupt the call to execute, but this does not seem to work. The signal gets sent, but does not get caught until after the call to execute finishes.

mysql library handles interrupted systems calls internally so you won't see side effects of SIGALRM until after API call completes (short of killing the current thread or process)

You can try patching MySQL-Python and use MYSQL_OPT_READ_TIMEOUT option (added in mysql 5.0.25)



回答3:

Why do I not get the signal until after execute finishes?

The query is executed through a C function, which blocks the Python VM from executing until it returns.

Is there another reliable way to limit query execution time?

This is (IMO) a really ugly solution, but it does work. You could run the query in a separate process (either via fork() or the multiprocessing module). Run the alarm timer in your main process, and when you receive it, send a SIGINT or SIGKILL to the child process. If you use multiprocessing, you can use the Process.terminate() method.



回答4:

Use adbapi. It allows you to do a db call asynchronously.

from twisted.internet import reactor
from twisted.enterprise import adbapi

def bogusQuery():
    return dbpool.runQuery("SELECT SLEEP(10)")

def printResult(l):
    # function that would be called if it didn't time out
    for item in l:
        print item

def handle_timeout():
    # function that will be called when it timeout
    reactor.stop()

dbpool = adbapi.ConnectionPool("MySQLdb", user="me", password="myself", host="localhost", database="async")
bogusQuery().addCallback(printResult)
reactor.callLater(4, handle_timeout)
reactor.run()


回答5:

Generic notes

I've experienced the same issue lately with several conditions I had to met:

  • solution must be thread safe
  • multiple connections to database from the same machine may be active at the same time, kill the exact one connection/query
  • application contains connections to many different databases - portable handler for each DB host

We had following class layout (unfortunately I cannot post real sources):

class AbstractModel: pass 
class FirstDatabaseModel(AbstractModel): pass # Connection to one DB host
class SecondDatabaseModel(AbstractModel): pass # Connection to one DB host

And created several threads for each model.


Solution Python 3.2

In our application one model = one database. So I've created "service connection" for each model (so we could execute KILL in parallel connection). Therefore if one instance of FirstDatabaseModel was created, 2 database connection were created; if 5 instances were created only 6 connections were used:

class AbstractModel:
    _service_connection = None # Formal declaration

    def __init__(self):
        ''' Somehow load config and create connection
        '''
        self.config = # ...
        self.connection = MySQLFromConfig(self.config)
        self._init_service_connection()

        # Get connection ID (pseudocode)
        self.connection_id = self.connection.FetchOneCol('SELECT CONNECTION_ID()') 

    def _init_service_connection(self):
        ''' Initialize one singleton connection for model
        '''
        cls = type(self)
        if cls._service_connection is not None:
            return

        cls._service_connection = MySQLFromConfig(self.config)

Now we need a killer:

def _kill_connection(self):
    # Add your own mysql data escaping
    sql = 'KILL CONNECTION {}'.format(self.connection_id)

    # Do your own connection check and renewal
    type(self)._service_connection.execute(sql)

Note: connection.execute = create cursor, execute, close cursor.

And make killer thread safe using threading.Lock:

def _init_service_connection(self):
    ''' Initialize one singleton connection for model
    '''
    cls = type(self)
    if cls._service_connection is not None:
        return

    cls._service_connection = MySQLFromConfig(self.config)
    cls._service_connection_lock = threading.Lock()

def _kill_connection(self):
    # Add your own mysql data escaping
    sql = 'KILL CONNECTION {}'.format(self.connection_id)
    cls = type(self)

    # Do your own connection check and renewal
    try:
        cls._service_connection_lock.acquire()    
        cls._service_connection.execute(sql)
    finally:
        cls._service_connection_lock.release()

And finally add timed execution method using threading.Timer:

def timed_query(self, sql, timeout=5):
    kill_query_timer = threading.Timer(timeout, self._kill_connection)
    kill_query_timer.start()

    try:
        self.connection.long_query() 
    finally:
        kill_query_timer.cancel()


回答6:

Why do I not get the signal until after execute finishes?

The process waiting for network I/O is in an uninterruptible state (UNIX thing, not related to Python or MySQL). It gets the signal after the system call finishes (probably as EINTR error code, although I am not sure).

Is there another reliable way to limit query execution time?

I think that it is usually done by an external tool like mkill that monitors MySQL for long running queries and kills them.