Notifying postgres changes to java application

2020-03-24 08:27发布

Problem

I'm building a postgres database for a few hundred thousand products. I will set-up an index (Solr or maybe ElasticSearch) to improve query times for complex search queries.

The point now is how to let the index synchronized with the database?

In the past I had a kind of application that polled the database periodically to check for updates that should be done, but I would have an outdated index state time (from the database update to the index update pull).

I would prefer a solution in which the database would notify my application (java application) that something has been changed within the database, and at that point the application will decide if the index needs to be updated or not. To be more accurate, I would build a kind of producer and consumer structure in wish the replica will receive notifications from postgres that something changed, if this is pertinent to the data indexed it is stored in a stack of updates-to-do. The consumer would consume this stack and build the documents to be stored into the index.

Possible Solutions

One solution would be to write a kind of replica end-point in which the application would behave as a postgres instance that is being used to replicate the data from the original database. Do someone have some experience with this approach?

Which other solution do I have for this problem?

3条回答
smile是对你的礼貌
2楼-- · 2020-03-24 09:06

To use LISTEN and NOTIFY of postgres you need to use a driver that can support asynchronous notifications. The postgres JDBC driver does not support asynchronous notifications.

To constantly LISTEN over a channel from Application Server go for the pgjdbc-ng 0.6 driver.

http://impossibl.github.io/pgjdbc-ng/

It supports async notifications, without polling.

查看更多
We Are One
3楼-- · 2020-03-24 09:09

Which other solution do I have for this problem?

Use LISTEN and NOTIFY to tell your app that things have changed.

You can send the NOTIFY from a trigger that also records changes in a queue table.

You'll need a PgJDBC connection that has sent a LISTEN for the event(s) you're using. It must poll the database by sending periodic empty queries ("") if you're using SSL; if you are not using SSL this can be avoided by use of the async notification checks. You'll need to unwrap the Connection object from your connection pool to be able to cast the underlying connection to a PgConnection to use listen/notify with. See related answer

The producer/consumer bit will be harder. To have multiple crash-safe concurrent consumers in PostgreSQL you need to use advisory locking with pg_try_advisory_lock(...). If you don't need concurrent consumers then it's easy, you just SELECT ... LIMIT 1 FOR UPDATE a row at a time.

Hopefully 9.4 will include an easier method of skipping locked rows with FOR UPDATE, as there's work in development for it.

查看更多
▲ chillily
4楼-- · 2020-03-24 09:16

In general, I would recommend to implement loose coupling using the EAI patterns. Then, if you decide to exchange the database, the code at the index side does not change.

In case, you want to stick with tight coupling, I would recommend to use LISTEN/NOTIFY. In Java, it is important to use the pgjdbc-ng driver, because it supports async notifications without polling.

Here's an asynchronous pattern (based on this answer):

import com.impossibl.postgres.api.jdbc.PGConnection;
import com.impossibl.postgres.api.jdbc.PGNotificationListener;
import com.impossibl.postgres.jdbc.PGDataSource;    
import java.sql.Statement;

public static void listenToNotifyMessage() {
    PGDataSource dataSource = new PGDataSource();
    dataSource.setHost("localhost");
    dataSource.setPort(5432);
    dataSource.setDatabase("database_name");
    dataSource.setUser("postgres");
    dataSource.setPassword("password");

    PGNotificationListener listener = (int processId, String channelName, String payload) 
       -> System.out.println("notification = " + payload);

    try (PGConnection connection = (PGConnection) dataSource.getConnection()) {
        Statement statement = connection.createStatement();
        statement.execute("LISTEN test");
        statement.close();
        connection.addNotificationListener(listener);
        // it only works if the connection is open. Therefore, we do an endless loop here.
        while (true) {
           Thread.sleep(500);
       }
    } catch (Exception e) {
        System.err.println(e);
    }
}

In the other statements, you can now execute NOTIFY test, 'This is a payload';. You can also execute NOTIFY in triggers etc.

查看更多
登录 后发表回答