Python的:实现并发的DNS请求(流水线)(Python: Implementing concu

2019-10-21 01:01发布

我想写发送多个DNS请求,使用每个请求一个不同的名称服务器python脚本。

实现连续的解决方案易于使用dnspython但对我来说太慢了。 使用线程池增加并发性是不可能的,因为在我的特殊情况下,所有的请求使用相同的源端口(REUSE_ADDRESS不会帮助这里要么)。

基于上述原因,我想使用下述溶液(开沟使用dnspython的解析器模块,但利用其信息建设的优势和解析模块):

  • 允许多达X请求是在进步
  • 同时发送X请求(只是使用UDP发送DNS请求报文。可能与增加的延迟之间发送,以避免脉冲串)
  • 不同的线程等待响应
  • 当响应到达时与所述请求(由地址)相匹配,并允许新的请求来运行
  • 如果对请求的响应不TIMEOUT秒内到达其标记为已完成,并允许运行一个新的请求

我在这里的主要问题是:

  • 如何轻松实现任务超时
  • 是否有可能实现它,而无需使用线程同步的(例如,使用事件循环?)
  • 是否有任何现有的库,可以帮助实现它(它真的感觉就像我想在这里重新发明轮子,我看着asycnio模块,但不能想出一个办法来利用它为我的问题)。 请注意,我不希望因为我需要改变的核心功能(例如,使用原始套接字,改变DNS报头字段等等)的灵活运用现有的DNS或网络库。

Answer 1:

你尝试aiodns包? https://pypi.python.org/pypi/aiodns/

对于超时ASYNCIO有标准wait_for协程( https://docs.python.org/3/library/asyncio-task.html#asyncio.wait_for )。



Answer 2:

使用一个简单的选择循环效果很好这里。 下面是一个完成代码片段:

def run(self, resolvers_iter):
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
    sock.setblocking(False)

    try:
        pending = []

        # For rate limiting
        limit = float(self.timeout)/self.max_pending  # delay between sends
        last_sent = clock() - limit

        # Work as long as there are more resolvers to query
        completed_sending = False
        while not completed_sending or pending:

            # Can I send more requests
            want_to_write = False
            if not completed_sending and len(pending) < self.max_pending:
                want_to_write = True

            # Calculate nearest timeout time to make sure select returns on time
            timeout = None
            if pending:
                timeout = self.timeout - clock() + pending[0][0] + 0.001
                timeout = max(timeout, 0)

            # Rate limit
            time_passed_since_send = clock() - last_sent
            if want_to_write and time_passed_since_send + 0.001 < limit:
                timeout = min(timeout, limit-time_passed_since_send)
                timeout = max(timeout, 0)
                want_to_write = False

            # Poll socket - uses internally the select module
            readable, writable = self._select(readable=True, writable=want_to_write, timeout=timeout)

            # Can read
            if readable:
                # Read as many as possible
                while True:
                    try:
                        # Get response
                        response, from_address = DnsFacilities.read_response(sock)

                        # Check if not duplicate or already timed out
                        sent_time = None
                        for i, (t, ip) in enumerate(pending):
                            if ip == from_address[0]:
                                sent_time = t
                                del pending[i]
                                break

                        if sent_time is not None:
                            self.response_received((response, from_address, clock()-sent_time))

                    except socket.error, e:
                        if e[0] in (socket.errno.EWOULDBLOCK, socket.errno.EAGAIN):
                            break
                        elif e[0] in (socket.errno.WSAECONNRESET, socket.errno.WSAENETRESET):
                            pass
                        else:
                            raise

            # Can write
            if writable:
                try:
                    last_sent = clock()
                    resolver_address = resolvers_iter.next()
                    DnsFacilities.send_query(resolver_address)
                    pending.append((clock(), resolver_address)
                except StopIteration:
                    completed_sending = True

            # Check for timed out tasks
            now = clock()
            while pending and now - pending[0][0] > self.timeout:
                self.response_timeout(pending[0][1])
                del pending[0]

    finally:
        sock.close()


文章来源: Python: Implementing concurrent DNS requests (pipelining)