我有通过经由rpy2数据帧到R,于是ř处理它一些Python代码和我拉所得data.frame回R作为经由PANDAS数据帧com.load_data
。
问题是,调用com.load_data
工作正常在一个Python过程,但是当同样的一串代码在几个运行它崩溃multiprocessing.Process
过程同时进行。 我收到以下错误消息出来的Python:
File "C:\\Python27\\lib\\site-packages\\pandas\\rpy\\common.py", line 29, in load_data
r.data(name) TypeError: 'DataFrame' object is not callable'
所以我的问题是,是不是rpy2实际上设计为能够并行运行,或者是它只是在一个错误load_data
功能? 我只是认为每个Python进程会得到它自己自主研发的会话。 据我所知道的,唯一的解决方法是具有R写入输出到一个文本文件,该文件的适当的Python程序可以打开和其处理下去。 但是,这是相当沉闷。
一些代码更新:
from rpy2.robjects.packages import importr
import rpy2.robjects as ro
import pandas as pd
import pandas.rpy.common as com
# Load C50 library into R environment
C50 = importr('C50')
...
# PANDAS data frame containing test dataset
testing = pd.DataFrame(testing)
# Pass testing dataset to R
rtesting = com.convert_to_r_dataframe(testing)
ro.globalenv['test'] = rtesting
# Strip "AsIs" from each column in the R data frame
# so that predict.C5.0 will work
for c in range(len(testing.columns)):
ro.r('''class(test[,{0}])=class(test[,{0}])[-match("AsIs", class(test[,{0}]))]'''.format(c+1))
# Make predictions on test dataset (res is pre-existing C5.0 tree)
ro.r('''preds=predict.C5.0(res, newdata=test)''')
ro.r('''preds=as.data.frame(preds)''')
# Get the predictions from R
preds = com.load_data('preds') ### Crashes here when code is run on several processes concurrently
#Further processing as necessary
...
rpy
作品通过运行Python进程和并联的R过程中,并在它们之间交换信息。 它没有考虑到的是,R调用称为并行使用multiprocess
。 因此,在实践中,每个蟒过程的连接到相同的R-处理。 这可能会导致你看到的问题。
绕过这个问题的方法之一是实现在R,而不是在Python的并行处理。 然后,您可以一次发送一切R,这将并行地处理它,其结果将被送回到Python。
以下(python3)码表明,至少如果使用multiprocessing.Pool,单独的R过程正在催生每个工作进程(@lgautier这是正确的?)
import os
import multiprocessing
import time
num_processes = 3
import rpy2.robjects as robjects
def test_r_process(pause):
n_called = robjects.r("times.called <- times.called + 1")[0]
r_pid = robjects.r("Sys.getpid()")[0]
print("R process for worker {} is {}. Pausing for {} seconds.".format(
os.getpid(), r_pid, pause))
time.sleep(pause)
return(r_pid, n_called)
pause_secs = [2,4,3,6,7,2,3,5,1,2,3,3]
results = {}
robjects.r("times.called <- 0")
with multiprocessing.Pool(processes=num_processes) as pool:
for proc, n_called in pool.imap_unordered(test_r_process, pause_secs):
results[proc]=max(n_called, results.get(proc) or 0)
print("The test function should have been called {} times".format(len(pause_secs)))
for pid,called in results.items():
print("R process {} was called {} times".format(pid,called))
对像我的OS X的笔记本电脑结果
R process for worker 22535 is 22535. Pausing for 3 seconds.
R process for worker 22533 is 22533. Pausing for 2 seconds.
R process for worker 22533 is 22533. Pausing for 6 seconds.
R process for worker 22535 is 22535. Pausing for 7 seconds.
R process for worker 22534 is 22534. Pausing for 2 seconds.
R process for worker 22534 is 22534. Pausing for 3 seconds.
R process for worker 22533 is 22533. Pausing for 5 seconds.
R process for worker 22534 is 22534. Pausing for 1 seconds.
R process for worker 22535 is 22535. Pausing for 2 seconds.
R process for worker 22534 is 22534. Pausing for 3 seconds.
R process for worker 22535 is 22535. Pausing for 3 seconds.
The test function should have been called 12 times
R process 22533 was called 3.0 times
R process 22534 was called 5.0 times
R process 22535 was called 4.0 times