I'm trying to write a python script that sends multiple DNS requests, using a different name server for each request.
Implementing a sequential solution is easy using dnspython but too slow for me.
Adding concurrency using thread pool is not possible as, in my particular case, all requests use the same source port (REUSE_ADDRESS won't help here either).
For the above reasons I'm thinking about using the following solution (ditching the use of dnspython's resolver module but taking advantage of its message building and parsing modules):
- Allowing up to X requests to be in progress
- Send X requests simultaneously (just sending dns request packets using udp. Probably with added delay between sends to avoid bursts)
- A different thread wait for responses
- When a response arrives match it with the request (by address) and allow a new request to run
- If a response to request does not arrive within TIMEOUT seconds mark it as completed and allow a new request to run
My main issues here are:
- How to implement the task timeout easily
- Is it possible to implement it without the use of thread synchronization (e.g. using event loop?)
- Is there any existing library that can help implementing it (it really feels like I'm trying to reinvent the wheel here, I looked into the asycnio module but couldn't figure out a way to take advantage of it for my problem). Note that I don't want to use existing dns or network libraries as I need the flexibility of changing core functionality (e.g. using raw sockets, changing DNS header fields, etc.).
Did you try aiodns
package? https://pypi.python.org/pypi/aiodns/
For timeouts asyncio has standard wait_for
coroutine (https://docs.python.org/3/library/asyncio-task.html#asyncio.wait_for).
Using a simple select loop works well here. Here is a code snippet for completion:
def run(self, resolvers_iter):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
sock.setblocking(False)
try:
pending = []
# For rate limiting
limit = float(self.timeout)/self.max_pending # delay between sends
last_sent = clock() - limit
# Work as long as there are more resolvers to query
completed_sending = False
while not completed_sending or pending:
# Can I send more requests
want_to_write = False
if not completed_sending and len(pending) < self.max_pending:
want_to_write = True
# Calculate nearest timeout time to make sure select returns on time
timeout = None
if pending:
timeout = self.timeout - clock() + pending[0][0] + 0.001
timeout = max(timeout, 0)
# Rate limit
time_passed_since_send = clock() - last_sent
if want_to_write and time_passed_since_send + 0.001 < limit:
timeout = min(timeout, limit-time_passed_since_send)
timeout = max(timeout, 0)
want_to_write = False
# Poll socket - uses internally the select module
readable, writable = self._select(readable=True, writable=want_to_write, timeout=timeout)
# Can read
if readable:
# Read as many as possible
while True:
try:
# Get response
response, from_address = DnsFacilities.read_response(sock)
# Check if not duplicate or already timed out
sent_time = None
for i, (t, ip) in enumerate(pending):
if ip == from_address[0]:
sent_time = t
del pending[i]
break
if sent_time is not None:
self.response_received((response, from_address, clock()-sent_time))
except socket.error, e:
if e[0] in (socket.errno.EWOULDBLOCK, socket.errno.EAGAIN):
break
elif e[0] in (socket.errno.WSAECONNRESET, socket.errno.WSAENETRESET):
pass
else:
raise
# Can write
if writable:
try:
last_sent = clock()
resolver_address = resolvers_iter.next()
DnsFacilities.send_query(resolver_address)
pending.append((clock(), resolver_address)
except StopIteration:
completed_sending = True
# Check for timed out tasks
now = clock()
while pending and now - pending[0][0] > self.timeout:
self.response_timeout(pending[0][1])
del pending[0]
finally:
sock.close()