Using H2 database server how to notify changes to

2019-02-16 02:25发布

问题:

I am successfully using H2 database in AUTO_SERVER mode so that a database file is shared among a number of desktop clients on a network transparently. This way a server is elected among the clients and all other clients read from the tcp server.

What I'm missing is how a client or the server can notify all other desktop clients something has been changed in the database. Right now I'm using a JGroups channel to let all clients comunicate one with each other however this is another point of failure and another leader election algorithm which runs in parallel with H2.

Isn't there any other method? I have read about the JMS (Java Message Service Java API) which is supported in some databases. Any hint for H2?

Thanks

EDIT:

The following code is an adaptation of the current answer, if I start the Sender first (set args as "sender") he connects as server to the H2 database, then I execute Receiver (set args as "receiver") in remote machines and they connect as clients.

Yet only the server receives notifications, clients don't receive anything.

This makes sense from what I currently know: a trigger is only called on the server, a user defined function called from a client or server is called on the client or server but not across all clients (and server) connected to the database.

So is there a way to adapt the below to notify all connected instances of a change in the database?

import java.io.File;
import java.sql.*;
import java.util.concurrent.atomic.AtomicLong;
import org.h2.tools.TriggerAdapter;

public class TestSimpleDB2
{

    public static void main(String[] args) throws Exception
    {
        //final String url = "jdbc:h2:mem:test;multi_threaded=true";
        final String url = "jdbc:h2:" + File.separator + "mnt/testdir/PlanIGS" + File.separator
                + "persondb;create=true;AUTO_SERVER=TRUE;multi_threaded=true";
        Connection conn = DriverManager.getConnection(url);
        Statement stat = conn.createStatement();

        boolean isSender = false;
        args = new String[]
        {
            "sender"
        };
        for (String arg : args)
        {
            if (arg.contains("receiver"))
            {
                System.out.println("receiver starting");
                isSender = false;
            }
            else if (arg.contains("sender"))
            {
                System.out.println("sender starting");
                isSender = true;
            }
        }

        if (isSender)
        {
            stat.execute("create alias wait_for_change for \""
                    + TestSimpleDB2.class.getName()
                    + ".waitForChange\"");
            stat.execute("create table test(id identity)");
            stat.execute("create trigger notifier "
                    + "before insert, update, delete, rollback "
                    + "on test call \""
                    + TestSimpleDB2.Notifier.class.getName() + "\"");

            Thread.sleep(1000);
            for (int i = 0; i < 10; i++)
            {
                System.out.println("Sender: I change something...");
                stat.execute("insert into test values(null)");
                Thread.sleep(2000);
            }
        }
        else
        {
            new Thread()
            {
                public void run()
                {
                    try
                    {
                        Connection conn = DriverManager.getConnection(url);
                        for (int i = 0; i < 10; i++)
                        {
                            conn.createStatement().execute(
                                    "call wait_for_change(100000)");
                            System.out.println("Receiver: event received");
                        }
                    }
                    catch (Exception e)
                    {
                        e.printStackTrace();
                    }
                }
            }.start();
        }
        conn.close();
    }

    static AtomicLong modCount = new AtomicLong();

    public static void waitForChange(long maxWaitMillis)
    {
        synchronized (modCount)
        {
            try
            {
                modCount.wait(maxWaitMillis);
            }
            catch (InterruptedException e)
            {
                // ignore
            }
        }
    }

    public static class Notifier extends TriggerAdapter
    {

        public void fire(Connection conn, ResultSet oldRow, ResultSet newRow)
                throws SQLException
        {
            modCount.incrementAndGet();
            synchronized (modCount)
            {
                modCount.notifyAll();
            }
        }
    }
}

回答1:

H2 does not implement JMS (in fact I don't know of a database that does). However, you could build a simple notify mechanism within H2, using a trigger and a user defined function, as follows. Please note this would require the multi-threaded mode in H2, which is not fully tested yet. Because of that, it might make sense to use a separate database for messaging than the database you use for your data.

import java.sql.*;
import java.util.concurrent.atomic.AtomicLong;
import org.h2.tools.TriggerAdapter;

public class TestSimpleDb {

    public static void main(String[] args) throws Exception {
        final String url = "jdbc:h2:mem:test;multi_threaded=true";
        Connection conn = DriverManager.getConnection(url);
        Statement stat = conn.createStatement();
        stat.execute("create alias wait_for_change for \"" + 
                TestSimpleDb.class.getName() + 
                ".waitForChange\"");
        stat.execute("create table test(id identity)");
        stat.execute("create trigger notifier " + 
                "before insert, update, delete, rollback " +
                "on test call \"" + 
                TestSimpleDb.Notifier.class.getName() + "\"");
        new Thread() {
            public void run() {
                try {
                    Connection conn = DriverManager.getConnection(url);
                    for (int i = 0; i < 10; i++) {
                        conn.createStatement().execute(
                                "call wait_for_change(10000)");
                        System.out.println("Receiver: event received");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }.start();
        Thread.sleep(500);
        for (int i = 0; i < 10; i++) {
            System.out.println("Sender: I change something...");
            stat.execute("insert into test values(null)");
            Thread.sleep(1000);
        }
        conn.close();
    }

    static AtomicLong modCount = new AtomicLong();

    public static void waitForChange(long maxWaitMillis) {
        synchronized (modCount) {
            try {
                modCount.wait(maxWaitMillis);
            } catch (InterruptedException e) {
                // ignore
            }
        }
    }

    public static class Notifier extends TriggerAdapter {
        public void fire(Connection conn, ResultSet oldRow, ResultSet newRow)
                throws SQLException {
            modCount.incrementAndGet();
            synchronized (modCount) {
                modCount.notifyAll();
            }
        }
    }

}