-->

sqlalchemy: stopping a long-running query

2020-06-12 14:07发布

问题:

I have a seemingly straight-forward situation, but can't find a straight-forward solution.

I'm using sqlalchemy to query postgres. If a client timeout occurs, I'd like to stop/cancel the long running postgres queries from another thread. The thread has access to the Session or Connection object.

At this point I've tried:

session.bind.raw_connection().close()

and

session.connection().close()

and

session.close

and

session.transaction.close()

But no matter what I try, the postgres query still continues until it's end. I know this from watching pg in top. Shouldn't this be fairly easy to do? I'm I missing something? Is this impossible without getting the pid and sending a stop signal directly?

回答1:

This seems to work well, so far:

def test_close_connection(self):
    import threading
    from psycopg2.extensions import QueryCanceledError
    from sqlalchemy.exc import DBAPIError

    session = Session()
    conn = session.connection()
    sql = self.get_raw_sql_for_long_query()

    seconds = 5
    t = threading.Timer(seconds, conn.connection.cancel)
    t.start()

    try:
        conn.execute(sql)
    except DBAPIError, e:
        if type(e.orig) == QueryCanceledError:
            print 'Long running query was cancelled.'
    t.cancel()

source



回答2:

For those MySQL folks that may have ended up here, a modified version of this answer that kills the query from a second connection can work. Essentially the following, assuming pymysql under the hood:

thread_id = conn1.connection.thread_id()
t = threading.Timer(seconds, lambda: conn2.execute("kill {}".format(thread_id)))

The original connection will raise pymysql.err.OperationalError. See this other answer for a neat way to create a long running query for testing.