Multiprocessing for Pandas Dataframe write to exce

2019-08-30 09:13发布

I have working code to write from a large dataframe to separate sheets in an excel file but it takes a long time about 30-40 minutes. I would like to find a way for it to run faster using multiprocessing.

I tried to rewrite it using multiprocessing so that writing to each excel tab could be done in parallel with multiple processors. The revised code runs without errors but it also is not writing to the excel file properly either. Any suggestions would be helpful.

Original working section of code:

import os
from excel_writer import append_df_to_excel
import pandas as pd

    path = os.path.dirname(
        os.path.abspath(__file__)) + '\\fund_data.xlsx'  # get path to current directory and excel filename for data
    data_cols = df_all.columns.values.tolist()  # Create a list of the columns in the final dataframe
    # print(data_cols)
    for column in data_cols:  # For each column in the dataframe
        df_col = df_all[column].unstack(level = -1)  # unstack so Dates are across the top oldest to newest
        df_col = df_col[df_col.columns[::-1]]  # reorder for dates are newest to oldest
        # print(df_col)
        append_df_to_excel(path, df_col, sheet_name = column, truncate_sheet = True,
                           startrow = 0)  # Add data to excel file

Revised code trying multiprocessing:

import os
from excel_writer import append_df_to_excel
import pandas as pd
import multiprocessing

def data_to_excel(col, excel_fn, data):
    data_fr = pd.DataFrame(data)        # switch list back to dataframe for putting into excel file sheets
    append_df_to_excel(excel_fn, data_fr, sheet_name = col, truncate_sheet = True, startrow = 0)    # Add data to sheet in excel file

if __name__ == "__main__":
    path = os.path.dirname(
        os.path.abspath(__file__)) + '\\fund_data.xlsx'  # get path to current directory and excel filename for data
    data_cols = df_all.columns.values.tolist()  # Create a list of the columns in the final dataframe
    # print(data_cols)
    pool = multiprocessing.Pool(processes = multiprocessing.cpu_count())

    for column in data_cols:  # For each column in the dataframe
        df_col = df_all[column].unstack(level = -1)  # unstack so Dates are across the top oldest to newest
        df_col = df_col[df_col.columns[::-1]]  # reorder for dates are newest to oldest
        # print(df_col)
        data_col = df_col.values.tolist()      # convert dataframe coluumn to a list to use in pool
        pool.apply_async(data_to_excel, args = (column, path, data_col))
    pool.close()
    pool.join()

1条回答
干净又极端
2楼-- · 2019-08-30 09:45

I do not know proper way to write to single file from multiple process. I need to solve similar problem. I solve it with creation writer process which gets data using Queue. You can see my solution here (sorry it is not documented).

Simplified version (draft)

from multiprocessing import Queue
input_queue = Queue()
res_queue = Queue()
process_list = []

def do_calculation(input_queue, res_queue, calculate_function):
    try:
        while True:
            data = in_queue.get(False)
            try:
                res = calculate_function(**data)
                out_queue.put(res)
            except ValueError as e:
                out_queue.put("fail")
                logging.error(f" fail on {data}")
    except queue.Empty:
        return

# put data in input queue 

def save_process(out_queue, file_path, count):
    for i in range(count):
        data = out_queue.get()
        if data == "fail":
            continue 
        # write to excel here

for i in range(process_num):
    p = Process(target=do_calculation, args=(input_queue, res_queue, calculate_function))
    p.start()
    process_list.append(p)

p2 = Process(target=save_process, args=(res_queue, path_to_excel, data_size))
p2.start()
p2.join()
for p in process_list:
    p.join()
查看更多
登录 后发表回答