Concurrent process inserting data in database

2019-08-02 03:21发布

问题:

Consider following schema in postgres database.

CREATE TABLE employee
(
  id_employee serial NOT NULL PrimarKey,
  tx_email_address text NOT NULL Unique,
  tx_passwd character varying(256)
)

I have a java class which does following

conn.setAutoComit(false);

ResultSet rs = stmt.("select * from employee where tx_email_address = 'test1'");
if (!rs.next()) {
    Insert Into employee Values ('test1', 'test1');
}
ResultSet rs = stmt.("select * from employee where tx_email_address = 'test2'");
if (!rs.next()) {
    Insert Into employee Values ('test2', 'test2');
}
ResultSet rs = stmt.("select * from employee where tx_email_address = 'test3'");
if (!rs.next()) {
    Insert Into employee Values ('test3', 'test3');
}
ResultSet rs = stmt.("select * from employee where tx_email_address = 'test4'");
if (!rs.next()) {
    Insert Into employee Values ('test4', 'test4');
}

conn.commit();
conn.setAutoComit(true);

The problem here is if there are two or more concurrent instance of the above mentioned transaction trying to write data. Only one transaction would eventually succeeds and rest would throw SQLException "unique key constraint violation". How do we get around this.

PS: I have chosen only one table and simple insert queries to demonstrate the problem. My application is java based application whose sole purpose is to write data to the target database. and there can be concurrent process doing so and there is very high probability that some process might be trying to write in same data(as shown in example above).

回答1:

The simplest way would seem to be to use the transaction isolation level 'serializable', which prevents phantom reads (other people inserting data which would satisfy a previous SELECT during your transaction).

if (!conn.getMetaData().supportsTransactionIsolationLevel(Connection.TRANSACTION_SERIALIZABLE)) {
    // OK, you're hosed. Hope for your sake your drivers supports this isolation level 
}
conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);

There are also techniques like Oracle's "MERGE" statement -- a single statement which does 'insert or update', depending on whether the data's there. I don't know if Postgres has an equivalent, but there are techniques to 'fake it' -- see e.g. How to write INSERT IF NOT EXISTS queries in standard SQL.



回答2:

I would first try to design the data flow in a way that only one transaction will ever get one instance of the data. In that scenario the "unique key constraint violation" should never happen and therefore indicate a real problem.

Failing that, I would catch and ignore the "unique key constraint violation" after each insert. Of course, logging that it happened might be a good idea still.

If both approaches were not feasible for some reason, then I would most probably create a transit table of the same structure as "employee", but without primary key constraint and with a "transit status" field. No "unique key constraint violation" would ever happen on the insert into this transit table. A job would be needed, that reads out this transit table and transfers the data into the "employee" table. This job would utilize the "transit status" to keep track of processed rows. I would let the job do different things each run:

  • execute an update statement on the transit table to set the "transit status" to "work in progress" for a number of rows. How large that number is or if all currently new rows get marked would need some thinking over.
  • execute an update statement that sets "transit status" to "duplicate" for all rows whose data is already in the "employee" table and whose "transit status" is not in ("duplicate", "processed")
  • repeat as long as there are rows in the transit table with "transit status" = "work in progress":
    • select a row from the transit table with "transit status" = "work in progress".
    • Insert that rows data into the "employee" table.
    • Set this rows "transit status" to "processed".
    • update all rows in the transit table with the same data as the currently processed row and "transit status" = "work in progress" to "transit status" = "duplicate".

I would most probably want another job to regularly delete the rows with "transit status" in ("duplicate", "processed")

If postgres does not know database jobs, an os side job would do.



回答3:

A solution is to use a table level exclusive lock, locking for write while allowing concurrent reads, using the command LOCK. Pseudo-sql-code:

select * from employee where tx_email_address = 'test1';
if not exists
   lock table employee in exclusive mode;
   select * from employee where tx_email_address = 'test1';
   if still not exists //may be inserted before lock
      insert into employee values ('test1', 'test1');
      commit; //releases exclusive lock

Note that using this method will block all other writes until the lock is released, lowering throughput.

If all inserts are dependent on a parent row, then a better approach is to lock only the parent row, serializing child inserts, instead of locking the whole table.



回答4:

You could expose a public method that queues the write operations and handles queue concurrency, then create another method to run on a different thread (or another process entirely) that actually performs the writes serially.



回答5:

You could add concurrency control at the application level this by making the code a critical section:

synchronized(lock) {
  // Code to perform selects / inserts within database transaction.
}

This way one thread is prevented from querying the table while the other is querying and inserting into the table. When the first thread completes, the second thread enters the synchronized block. However, at this point each select attempt will return data and hence the thread will not attempt to insert data.

EDIT:

In cases where you have multiple processes inserting into the same table you could consider taking out a table lock when performing the transaction to prevent other transactions from commencing. This is effectively doing the same as the code above (i.e. serializing the two transactions) but at the database level. Obviously there are potential performance implications in doing this.



回答6:

One way to solve this particular problem is by ensuring that each of the individual threads/instances process rows in a mutually exclusive manner. In other words if instance 1 processes rows where tx_email_address='test1' then no other instance should process these rows again.

This can be achieved by generating a unique server id on instance startup and marking the rows to be processed with this server id. The way to do it is by -

<LOOP>

  1. adding 2 columns status and server_id to employee table.
  2. update employee set status='In Progress', server_id='<unique_id_for_instance>' where status='Uninitialized' and rownum<2
  3. commit
  4. select * from employee where server_id='<unique_id_for_instance>' and status='In Progress'
  5. process the rows selected in step 4.

<END LOOP>

Following the above sequence of steps ensures that all the VM instances get different rows to process and there is no deadlock. It is necessary to have update before select to make the operation atomic. Doing it the other way round can lead to concurrency issues.

Hope this helps



回答7:

An often used system is to have a primary key that is a UUID ( Unique Universal ID ) and a UUIDGenerator, see http://jug.safehaus.org/ or similar things google has lots of answers

This will prevent the Unique Key constraint to happen

But that offcourse is only a part of your problem, you tx_email_address would still have to be unique and nothing solves that.

There is no way to prevent the constraint violation to happen, as long as you have concurrency you will run into it, and in itself this really is no problem.