如何通过ZeroMQ(jzmq)3.xx的使用XPUB和XSUB实现与代理发布 - 订阅网络(how

2019-08-08 06:00发布

我试图用XPUB和XSUB在这个下面的图提供了实现。 我已经通过提供他们的榜样了,但不能得到一个XPUB和XSUB在Java中。 在这里 ,他们给C的一个例子是有点复杂,因为我是新来ZeroMQ。


我试图在采用了android使用它JNI包装版本 。 请帮我找到一个例子,如何用java实现与代理发布-订阅网 ZeroMQ。

目前,我指http://zguide.zeromq.org/page:all

我试图将它移植如下。 Subscriber.java

公共类用户扩展Thread实现Runnable { 

private static final String TAG = "Subscriber";
private Context ctx;

public Subscriber(ZMQ.Context z_context) {
    this.ctx = z_context;
}

@Override
public void run() {

    super.run();

    ZMQ.Socket mulServiceSubscriber = ctx.socket(ZMQ.SUB);
    mulServiceSubscriber.connect("tcp://localhost:6001");
    mulServiceSubscriber.subscribe("A".getBytes());
    mulServiceSubscriber.subscribe("B".getBytes()); 


        while (true) {
            Log.d(TAG, "Subscriber loop started..");
            String content = new String(mulServiceSubscriber.recv(0));
            Log.d(TAG, "Subscriber Received : "+content);
        }
}

}

Publisher.java

  公共类出版社继承Thread实现Runnable { 

private static final String TAG = "Publisher";
private Context ctx;

public Publisher(ZMQ.Context z_context) {
    this.ctx = z_context;
}

@Override
public void run() {

    super.run();

    ZMQ.Socket publisher = ctx.socket(ZMQ.PUB);
    publisher.connect("tcp://localhost:6000");

    while (true) {
        Log.d(TAG, "Publisher loop started..");
        publisher.send(("A Hello " + new Random(100).nextInt()).getBytes() , 0);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

}

XListener.java(现在简单的转发)

公共类XListener继承Thread实现Runnable { 

private static final String TAG = null;
private Socket publisherX;
private Context ctx;
private Socket subscriberX;

public XListener(ZMQ.Context ctx, ZMQ.Socket subscriberX,
        ZMQ.Socket publisherX) {
    this.ctx = ctx;
    this.subscriberX = subscriberX;
    this.publisherX = publisherX;

}

@Override
public void run() {
    super.run();
    while (true) {          
        Log.d(TAG, "XListener loop started..");

        String msg = new String(subscriberX.recvStr());
        Log.v(TAG, "Listener Received: " +"MSG :"+msg);
        publisherX.send(msg.getBytes(), 0);         
    }
}

}

在应用程序的主()

  私人无效的main(){          ZMQ.Context CTX = ZMQ.context(1); 

ZMQ.Socket subscriberX = ctx.socket(ZMQ.XSUB); subscriberX.bind("tcp://*:6000"); try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } ZMQ.Socket publisherX = ctx.socket(ZMQ.XPUB); publisherX.bind("tcp://*:6001"); try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } new XListener(ctx, subscriberX, publisherX).start(); try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } new XSender(ctx, subscriberX, publisherX).start(); try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } new Subscriber(ctx).start(); new Publisher(ctx).start(); }

随着代码我不能听XSUB。 虽然移植espresso.c,我没能找到ZMQ的Java绑定任何包装。 如何实现一个简单的代理还是我失去了一些东西?

Answer 1:

哇,我回答我的问题。 我错过了从publisherX添加转发器来subscriberX。 这里是遗漏码。 现在XSUB和XPUB能够发送和获取数据。

公共类XSender继承Thread实现Runnable { 

private static final String TAG = null;
private Socket publisherX;
private Context ctx;
private Socket subscriberX;

public XSender(ZMQ.Context ctx, ZMQ.Socket subscriberX,
        ZMQ.Socket publisherX) {
    this.ctx = ctx;
    this.subscriberX = subscriberX;
    this.publisherX = publisherX;

}

@Override
public void run() {
    super.run();
    while (true) {
        // Read envelope with address
        Log.d(TAG, "XListener loop started..");

        String msg = new String(subscriberX.recv(0));
        Log.v(TAG, "Listener Received: " +"MSG :"+msg);
        publisherX.send(msg.getBytes(), 0);         

    }


}

}



文章来源: how to implement Pub-Sub Network with a Proxy by using XPUB and XSUB in ZeroMQ(jzmq) 3.xx