A better way to load MongoDB data to a DataFrame u

2019-02-03 16:26发布

问题:

I have a 0.7 GB MongoDB database containing tweets that I'm trying to load into a dataframe. However, I get an error.

MemoryError:    

My code looks like this:

cursor = tweets.find() #Where tweets is my collection
tweet_fields = ['id']
result = DataFrame(list(cursor), columns = tweet_fields)

I've tried the methods in the following answers, which at some point create a list of all the elements of the database before loading it.

  • https://stackoverflow.com/a/17805626/2297475
  • https://stackoverflow.com/a/16255680/2297475

However, in another answer which talks about list(), the person said that it's good for small data sets, because everything is loaded into memory.

  • https://stackoverflow.com/a/13215411/2297475

In my case, I think it's the source of the error. It's too much data to be loaded into memory. What other method can I use?

回答1:

I've modified my code to the following:

cursor = tweets.find(fields=['id'])
tweet_fields = ['id']
result = DataFrame(list(cursor), columns = tweet_fields)

By adding the fields parameter in the find() function I restricted the output. Which means that I'm not loading every field but only the selected fields into the DataFrame. Everything works fine now.



回答2:

The fastest, and likely most memory-efficient way, to create a DataFrame from a mongodb query, as in your case, would be using monary.

This post has a nice and concise explanation.



回答3:

an elegant way of doing it would be as follows:

import pandas as pd
def my_transform_logic(x):
    if x :
        do_something
        return result

def process(cursor):
    df = pd.DataFrame(list(cursor))
    df['result_col'] = df['col_to_be_processed'].apply(lambda value: my_transform_logic(value))

    #making list off dictionaries
    db.collection_name.insert_many(final_df.to_dict('records'))

    # or update
    db.collection_name.update_many(final_df.to_dict('records'),upsert=True)


#make a list of cursors.. you can read the parallel_scan api of pymongo

cursors = mongo_collection.parallel_scan(6)
for cursor in cursors:
    process(cursor)

I tried the above process on a mongoDB collection with 2.6 million records using Joblib on the above code. My code didnt throw any memory errors and the processing finished in 2 hrs.