在鼠兔或RabbitMQ的,我该如何检查是否有消费者目前消费?(In Pika or RabbitM

2019-07-02 10:28发布

我想检查,如果消费者/工人存在消耗留言我要发送。

如果没有任何工人 ,我将开始一些工人(消费者和出版商都在一台机器上),然后去发布消息

如果有像功能connection.check_if_has_consumers ,我就有点像这样实现它-

import pika
import workers

# code for publishing to worker queue
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# if there are no consumers running (would be nice to have such a function)
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""):
    # start the workers in other processes, using python's `multiprocessing`
    workers.start_workers()

# now, publish with no fear of your queues getting filled up
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True)
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin",
                            properties=pika.BasicProperties(delivery_mode=2))
connection.close()

但我无法找到任何功能check_if_has_consumers鼠兔的功能。

有没有实现这一点,使用鼠兔的一些方法? 也许,通过谈话直接的兔子

我不能完全肯定,但我真的认为RabbitMQ的是意识的消费者订阅不同的队列数量,因为它群发留言 ,对他们接受的ACK

我刚刚开始的RabbitMQ 2小时前...任何帮助,欢迎...

这里是我写的,如果它的任何帮助的workers.py代码....

import multiprocessing
import pika


def start_workers(num=3):
    """start workers as non-daemon processes"""
    for i in xrange(num):    
        process = WorkerProcess()
        process.start()


class WorkerProcess(multiprocessing.Process):
    """
    worker process that waits infinitly for task msgs and calls
    the `callback` whenever it gets a msg
    """
    def __init__(self):
        multiprocessing.Process.__init__(self)
        self.stop_working = multiprocessing.Event()

    def run(self):
        """
        worker method, open a channel through a pika connection and
        start consuming
        """
        connection = pika.BlockingConnection(
                              pika.ConnectionParameters(host='localhost')
                     )
        channel = connection.channel()
        channel.queue_declare(queue='worker_queue', auto_delete=False,
                                                    durable=True)

        # don't give work to one worker guy until he's finished
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(callback, queue='worker_queue')

        # do what `channel.start_consuming()` does but with stopping signal
        while len(channel._consumers) and not self.stop_working.is_set():
            channel.transport.connection.process_data_events()

        channel.stop_consuming()
        connection.close()
        return 0

    def signal_exit(self):
        """exit when finished with current loop"""
        self.stop_working.set()

    def exit(self):
        """exit worker, blocks until worker is finished and dead"""
        self.signal_exit()
        while self.is_alive(): # checking `is_alive()` on zombies kills them
            time.sleep(1)

    def kill(self):
        """kill now! should not use this, might create problems"""
        self.terminate()
        self.join()


def callback(channel, method, properties, body):
    """pika basic consume callback"""
    print 'GOT:', body
    # do some heavy lifting here
    result = save_to_database(body)
    print 'DONE:', result
    channel.basic_ack(delivery_tag=method.delivery_tag)

编辑:

我必须继续前进所以这里是我要带,除非有更好的方法来一起解决办法,

因此,RabbitMQ的有以下HTTP管理API ,他们的工作你已经打开后管理插件 ,并在HTTP API页面的中间有

/ API /连接 - 所有打开的连接的列表。

/ API /连接/名称 - 单个连接。 它delete一个将关闭连接。

所以,如果我连我的工作人员和我通过不同的连接名/用户同时生产 ,我就可以检查工人的连接是开放的......(有可能是问题,当工人死亡...)

将等待一个更好的解决方案?

编辑:

刚刚发现这个在RabbitMQ的文档,但是这将是哈克在Python做:

shobhit@oracle:~$ sudo rabbitmqctl -p vhostname list_queues name consumers
Listing queues ...
worker_queue    0
...done.

所以我可以做这样的事情,

subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'")

哈克......还是希望鼠有一些Python功能做到这一点...

谢谢,

Answer 1:

我只是在寻找到这一点。 通过源和文档阅读后,我在channel.py跨越下面就来:

@property
def consumer_tags(self):
    """Property method that returns a list of currently active consumers

    :rtype: list

    """
    return self._consumers.keys()

我自己的测试是成功的。 我用下面的在我的通道对象是self._channel:

if len(self._channel.consumer_tags) == 0:
        LOGGER.info("Nobody is listening.  I'll come back in a couple of minutes.")
        ...


Answer 2:

其实,我发现这个事故寻找一个不同的问题,但有一点是可以帮助你是在Basic_Publish功能,有一个参数“立即”,这是默认为假。

你可以做一个想法是立即标志设置为true,这将需要的,而不是坐在队列它由消费者立即被消耗。 如果工人不提供消费信息,它会踢回一个错误,告诉你要开始另一名工人。

根据吞吐量的系统,这将既可以产卵很多额外的工作人员,或产卵的工人来代替死去的工人。 对于前一个问题,你可以写一个管理类系​​统,只需通过控制队列,在那里你可以告诉一个“亚军”之类的过程杀工人说,现在不再需要的流程跟踪工人。



文章来源: In Pika or RabbitMQ, How do I check if any consumers are currently consuming?