优化:从流API蒙戈倾销JSON(Optimization: Dumping JSON from a

2019-07-29 17:03发布

背景:我有一个python模块设置抓住从流API JSON对象,并使用pymongo存储它们(一次25批量插入)在MongoDB中。 为了便于比较,我也有一个bash命令来curl来自同一数据流的API和pipemongoimport 。 无论是在单独的集合,这些方法存储数据。

每隔一段时间,我监视count()的集合,以检查他们表现如何。

到目前为止,我看到了python模块由后面约1000 JSON对象落后curl | mongoimport curl | mongoimport方法。

问题:我如何才能优化我python模块为〜与同步curl | mongoimport curl | mongoimport

我不能用tweetstream ,因为我不使用Twitter的API,但第三方流媒体服务。

可能有人请帮助我在这里?

Python模块:


class StreamReader:
    def __init__(self):
        try:
            self.buff = ""
            self.tweet = ""
            self.chunk_count = 0
            self.tweet_list = []
            self.string_buffer = cStringIO.StringIO()
            self.mongo = pymongo.Connection(DB_HOST)
            self.db = self.mongo[DB_NAME]
            self.raw_tweets = self.db["raw_tweets_gnip"]
            self.conn = pycurl.Curl()
            self.conn.setopt(pycurl.ENCODING, 'gzip')
            self.conn.setopt(pycurl.URL, STREAM_URL)
            self.conn.setopt(pycurl.USERPWD, AUTH)
            self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
            self.conn.perform()
        except Exception as ex:
            print "error ocurred : %s" % str(ex)

    def handle_data(self, data):
        try:
            self.string_buffer = cStringIO.StringIO(data)
            for line in self.string_buffer:
                try:
                    self.tweet = json.loads(line)
                except Exception as json_ex:
                    print "JSON Exception occurred: %s" % str(json_ex)
                    continue

                if self.tweet:
                    try:
                        self.tweet_list.append(self.tweet)
                        self.chunk_count += 1
                        if self.chunk_count % 1000 == 0
                            self.raw_tweets.insert(self.tweet_list)
                            self.chunk_count = 0
                            self.tweet_list = []

                    except Exception as insert_ex:
                        print "Error inserting tweet: %s" % str(insert_ex)
                        continue
        except Exception as ex:
            print "Exception occurred: %s" % str(ex)
            print repr(self.buff)

    def __del__(self):
        self.string_buffer.close()

谢谢阅读。

Answer 1:

原来有你的代码中的错误。

                if self.chunk_count % 50 == 0
                    self.raw_tweets.insert(self.tweet_list)
                    self.chunk_count = 0

您重置chunk_count但你不重置tweet_list。 所以,第二次通过尝试插入100个项目(50个新加50已经发送之前到DB的时间)。 你已经解决了这个问题,但仍然可以看到在性能上的差异。

整批大小的东西原来是一个红鲱鱼。 我试着使用JSON的大文件,并且通过mongoimport它通过Python与负荷加载它和Python总是快(即使在安全模式 - 见下文)。

以你的代码仔细看看,我意识到这个问题是一个事实,即流API实际上是交给你的块数据。 我们希望你只是把这些大块,放到数据库中(这是mongoimport正在做)。 你的Python是做分裂流,将其添加到列表,然后定期发送批量蒙戈额外的工作,可能是与我所看到的,你看看有什么区别。

试试这个片断为您handle_data()

def handle_data(self, data):
    try:
        string_buffer = StringIO(data)
        tweets = json.load(string_buffer)
    except Exception as ex:
        print "Exception occurred: %s" % str(ex)
    try:
        self.raw_tweets.insert(tweets)
    except Exception as ex:
        print "Exception occurred: %s" % str(ex)

需要注意的一点是,你的蟒蛇插入未在“安全模式”运行 -你应该改变通过添加参数safe=True到您的插入语句。 然后,您将获得失败和你的try / catch将打印错误暴露出问题的任何插入一个例外。

它不花费在性能上,所以尽管 - 我目前正在运行的测试和大约五分钟后,两个集合的大小是14120 14113。



Answer 2:

摆脱了StringIO的库​​。 由于WRITEFUNCTION回调handle_data ,在这种情况下,被调用的每一行,只需加载JSON直接。 但是,有时有可能有两个JSON包含的数据对象。 我很抱歉,我不能发布的curl我用,因为它包含了我们的凭据命令。 但是,正如我所说,这是适用于任何流API的一般问题。


def handle_data(self, buf): 
    try:
        self.tweet = json.loads(buf)
    except Exception as json_ex:
        self.data_list = buf.split('\r\n')
        for data in self.data_list:
            self.tweet_list.append(json.loads(data))    


文章来源: Optimization: Dumping JSON from a Streaming API to Mongo