-->

How do I efficiently do a bulk insert-or-update wi

2020-06-03 20:30发布

问题:

I'm using SQLAlchemy with a Postgres backend to do a bulk insert-or-update. To try to improve performance, I'm attempting to commit only once every thousand rows or so:

trans = engine.begin()
  for i, rec in enumerate(records):
    if i % 1000 == 0:
      trans.commit()
      trans = engine.begin()
    try:
        inserter.execute(...)
    except sa.exceptions.SQLError:
        my_table.update(...).execute()
trans.commit()

However, this isn't working. It seems that when the INSERT fails, it leaves things in a weird state that prevents the UPDATE from happening. Is it automatically rolling back the transaction? If so, can this be stopped? I don't want my entire transaction rolled back in the event of a problem, which is why I'm trying to catch the exception in the first place.

The error message I'm getting, BTW, is "sqlalchemy.exc.InternalError: (InternalError) current transaction is aborted, commands ignored until end of transaction block", and it happens on the update().execute() call.

回答1:

You're hitting some weird Postgresql-specific behavior: if an error happens in a transaction, it forces the whole transaction to be rolled back. I consider this a Postgres design bug; it takes quite a bit of SQL contortionism to work around in some cases.

One workaround is to do the UPDATE first. Detect if it actually modified a row by looking at cursor.rowcount; if it didn't modify any rows, it didn't exist, so do the INSERT. (This will be faster if you update more frequently than you insert, of course.)

Another workaround is to use savepoints:

SAVEPOINT a;
INSERT INTO ....;
-- on error:
ROLLBACK TO SAVEPOINT a;
UPDATE ...;
-- on success:
RELEASE SAVEPOINT a;

This has a serious problem for production-quality code: you have to detect the error accurately. Presumably you're expecting to hit a unique constraint check, but you may hit something unexpected, and it may be next to impossible to reliably distinguish the expected error from the unexpected one. If this hits the error condition incorrectly, it'll lead to obscure problems where nothing will be updated or inserted and no error will be seen. Be very careful with this. You can narrow down the error case by looking at Postgresql's error code to make sure it's the error type you're expecting, but the potential problem is still there.

Finally, if you really want to do batch-insert-or-update, you actually want to do many of them in a few commands, not one item per command. This requires trickier SQL: SELECT nested inside an INSERT, filtering out the right items to insert and update.



回答2:

This error is from PostgreSQL. PostgreSQL doesn't allow you to execute commands in the same transaction if one command creates an error. To fix this you can use nested transactions (implemented using SQL savepoints) via conn.begin_nested(). Heres something that might work. I made the code use explicit connections, factored out the chunking part and made the code use the context manager to manage transactions correctly.

from itertools import chain, islice
def chunked(seq, chunksize):
    """Yields items from an iterator in chunks."""
    it = iter(seq)
    while True:
        yield chain([it.next()], islice(it, chunksize-1))

conn = engine.commit()
for chunk in chunked(records, 1000):
    with conn.begin():
        for rec in chunk:
            try:
                with conn.begin_nested():
                     conn.execute(inserter, ...)
            except sa.exceptions.SQLError:
                conn.execute(my_table.update(...))

This still won't have stellar performance though due to nested transaction overhead. If you want better performance try to detect which rows will create errors beforehand with a select query and use executemany support (execute can take a list of dicts if all inserts use the same columns). If you need to handle concurrent updates, you'll still need to do error handling either via retrying or falling back to one by one inserts.