Tweepy Connection broken: IncompleteRead - best wa

2020-07-23 03:44发布

问题:

I am using tweepy to handle a large twitter stream (following 4,000+ accounts). The more accounts that I add to the stream, the more likely I am to get this error:

Traceback (most recent call last):
  File "myscript.py", line 2103, in <module>
main()
  File "myscript.py", line 2091, in main
    twitter_stream.filter(follow=USERS_TO_FOLLOW_STRING_LIST,     stall_warnings=True)
  File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 445, in filter
self._start(async)
  File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 361, in _start
self._run()
  File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 294, in _run
raise exception
requests.packages.urllib3.exceptions.ProtocolError: ('Connection broken:     IncompleteRead(0 bytes read, 2000 more expected)', IncompleteRead(0 bytes read, 2000 more expected))

Obviously that is a thick firehose - empirically obviously, it's too thick to handle. Based on researching this error on stackoverflow as well as the empirical trend that 'the more accounts to follow I add, the faster this exception occurs', my hypothesis is that this is 'my fault'. My processing of each tweet takes too long and/or my firehose is too thick. I get that.

But notwithstanding that setup, I still have two questions that I can't seem to find solid answers for.
1. Is there a way to simply 'handle' this exception, accept that I will miss some tweets, but keep the script running? I figure maybe it misses a tweet (or many tweets', but if I can live without 100% of the tweets I want, then the script/stream can still go on, ready to catch the next tweet whenever it can.

I've tried this exception handling, which was recommended for that in a similar question on stackoverflow: from urllib3.exceptions import ProtocolError

    while True:
        try:
            twitter_stream.filter(follow=USERS_TO_FOLLOW_STRING_LIST, stall_warnings=True)

        except ProtocolError:
            continue

But unfortunately for me, (perhaps I implemented it incorrectly, but I don't think I did), that did not work. I get the same exact error I was previously getting with or without that recommended exception handling code in place.

  1. I have never implemented queues and/or threading in my python code. Would this be a good time for me to try to implement that? I don't know everything about queues/threading, but I am imagining...

Could I have the tweets sort of written - in the raw - pre-processing - to memory, or a database, or something, on one thread? And then, have a second thread ready to do the processing of those tweets, as soon as it's ready? I figure that way, at least, it takes my post-processing of the tweet out of the equation as a limiting factor on the bandwidth of the firehose I am reading. Then if I still get the error I can cut back on who I am following, etc.

I have watched some threading tutorials but figured might be worth asking if that 'works' with ... this tweepy/twitter/etc/ complex. I am not confident in my understanding of the problem I have or how threading might help, so figured I could ask for advice as to if indeed that would help me here.

If this idea is valid, is there a sort of simple piece of example code someone could help me with to point me in the right direction?

回答1:

I think i solved this problem by finally completing my first queue/thread implementation. I am not learned enough to know the best way to do this, but I think this way does work. Using the below code I now build up a queue of new tweets and can handle them as I wish in the queue, rather than falling behind and losing my connection with tweepy.

from Queue import Queue
from threading import Thread 

class My_Parser(tweepy.StreamListener):

    def __init__(self, q = Queue()):

        num_worker_threads = 4
        self.q = q
        for i in range(num_worker_threads):
             t = Thread(target=self.do_stuff)
             t.daemon = True
             t.start()

    def on_data(self, data):

        self.q.put(data)


    def do_stuff(self):
        while True:

            do_whatever(self.q.get())


            self.q.task_done()

I did continue digging for a while about the IncompleteRead error and I tried numerous more Exception handlings solutions using url libs and http libs but I struggled with that. And I think there may be some benefits to the queueing stuff anyway outside of just keeping the connection (for one, won't lose data).

Hopefully this is helpful to someone. haha.



回答2:

Thank you a lot man, I was facing a problem like this and tried all kinds of solutions. This was happening because, beside streaming from the API, I was doing a lot of processing with the data and this made me lose connection. I just made some adjustments in the way you did, i had to add super().init() in the init method because I'am using on_status and the import Queue must be lower case. Another thing, I didn't make the do_whatever, I just put self.q.get() inside the while. Anyway, works perfectly, thanks a lot again.

The final code:

from queue import Queue
from threading import Thread

class Listener(tweepy.StreamListener):

    def __init__(self, q = Queue()):
        super().__init__()
        self.q = q
        for i in range(4):
            t = Thread(target=self.do_stuff)
            t.daemon = True
            t.start()

    def on_status(self, status):
        <my code here>

    def do_stuff(self):
        while True:
            self.q.get()
            self.q.task_done()