Can rpy2 code be run in parallel?

2019-07-31 10:30发布

I have some Python code that passes a data frame to R via rpy2, whereupon R processes it and I pull the resulting data.frame back to R as a PANDAS data frame via com.load_data.

The thing is, the call to com.load_data works fine in a single Python process but it crashes when the same bunch of code is run in several multiprocessing.Process processes concurrently. I get the following error message out of Python:

File "C:\\Python27\\lib\\site-packages\\pandas\\rpy\\common.py", line 29, in load_data
    r.data(name) TypeError: 'DataFrame' object is not callable'

So my question is, is rpy2 not actually designed to be able to be run in parallel, or is it merely a bug in the load_data function? I just assumed that each Python process would get its own independent R session. As far as I can tell, the only workaround would be to have R write the output to a text file which the appropriate Python process can open and go on with its processing. But this is pretty clunky.

Update with some code:

from rpy2.robjects.packages import importr
import rpy2.robjects as ro
import pandas as pd
import pandas.rpy.common as com

# Load C50 library into R environment
C50 = importr('C50')

...

# PANDAS data frame containing test dataset
testing = pd.DataFrame(testing)

# Pass testing dataset to R
rtesting = com.convert_to_r_dataframe(testing)
ro.globalenv['test'] = rtesting

# Strip "AsIs" from each column in the R data frame
# so that predict.C5.0 will work
for c in range(len(testing.columns)):
    ro.r('''class(test[,{0}])=class(test[,{0}])[-match("AsIs", class(test[,{0}]))]'''.format(c+1))

# Make predictions on test dataset (res is pre-existing C5.0 tree)
ro.r('''preds=predict.C5.0(res, newdata=test)''')
ro.r('''preds=as.data.frame(preds)''')

# Get the predictions from R
preds = com.load_data('preds') ### Crashes here when code is run on several processes concurrently

#Further processing as necessary
...

标签: python r rpy2
2条回答
趁早两清
2楼-- · 2019-07-31 10:39

rpy works by running a Python process and an R process in parallel, and exchange information between them. It does not take into account that R calls are called in parallel using multiprocess. So in practice, each of the python processes connects to the same R process. This probably causes the issues you see.

One way to circumvent this issue is to implement the parallel processing in R, and not in Python. You then send everything at once to R, this will process it in parallel, and the result will be sent back to Python.

查看更多
▲ chillily
3楼-- · 2019-07-31 11:01

The following (python3) code suggests that, at least if a multiprocessing.Pool is used, separate R process are being spawned for each worker process (@lgautier is this right?)

import os
import multiprocessing
import time
num_processes = 3
import rpy2.robjects as robjects

def test_r_process(pause):
    n_called = robjects.r("times.called <- times.called + 1")[0]
    r_pid = robjects.r("Sys.getpid()")[0]
    print("R process for worker {} is {}. Pausing for {} seconds.".format(
        os.getpid(), r_pid, pause))
    time.sleep(pause)
    return(r_pid, n_called)


pause_secs = [2,4,3,6,7,2,3,5,1,2,3,3]
results = {}
robjects.r("times.called <- 0")
with multiprocessing.Pool(processes=num_processes) as pool:
    for proc, n_called in pool.imap_unordered(test_r_process, pause_secs):
        results[proc]=max(n_called, results.get(proc) or 0)
print("The test function should have been called {} times".format(len(pause_secs)))
for pid,called in results.items():
    print("R process {} was called {} times".format(pid,called))

On my OS X laptop results in something like

R process for worker 22535 is 22535. Pausing for 3 seconds.
R process for worker 22533 is 22533. Pausing for 2 seconds.
R process for worker 22533 is 22533. Pausing for 6 seconds.
R process for worker 22535 is 22535. Pausing for 7 seconds.
R process for worker 22534 is 22534. Pausing for 2 seconds.
R process for worker 22534 is 22534. Pausing for 3 seconds.
R process for worker 22533 is 22533. Pausing for 5 seconds.
R process for worker 22534 is 22534. Pausing for 1 seconds.
R process for worker 22535 is 22535. Pausing for 2 seconds.
R process for worker 22534 is 22534. Pausing for 3 seconds.
R process for worker 22535 is 22535. Pausing for 3 seconds.
The test function should have been called 12 times
R process 22533 was called 3.0 times
R process 22534 was called 5.0 times
R process 22535 was called 4.0 times
查看更多
登录 后发表回答