并联高效地将函数应用于分组的熊猫数据帧(Efficiently applying a functio

2019-06-26 15:46发布

我经常需要的功能应用到一个非常大的群体DataFrame (混合数据类型),并想利用多核心的优势。

我可以创建从基的迭代器,并使用多处理模块,但它是有效的,因为每一个组与函数的结果必须被腌渍进程之间的消息传递。

有没有办法避免酸洗甚至避免的复制DataFrame完全? 它看起来像多处理模块的共享存储器的功能仅限于numpy阵列。 是否还有其他选择吗?

Answer 1:

从上面的意见,看来这个计划在pandas一段时间(也有一个有趣的前瞻性rosetta项目 ,我只注意到)。

然而,直到每一个并行的功能被结合到pandas ,我注意到,它很容易编写高效&非存储器复制平行扩增以pandas直接使用cython + OpenMP的和C ++。

下面是写一个平行GROUPBY-总和,其使用像这样的一个简单的例子:

import pandas as pd
import para_group_demo

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

输出是:

     sum
key     
0      6
1      11
2      4

注意无疑,这个简单的例子的功能最终将部分pandas 。 有些事情,然而,会更自然的C ++并行一段时间,要知道它是多么容易此合并成是很重要的pandas


要做到这一点,我写了一个简单的单源文件扩展其代码如下。

它开始与一些进口和类型定义

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

在C ++ unordered_map类型是由单个线程求和,并且vector为所有线程求和。

现在的功能sum 。 它开始了与类型内存意见以便快速访问:

def sum(crit, vals):
    cdef int64_t[:] crit_view = crit.values
    cdef int64_t[:] vals_view = vals.values

的功能继续通过将半同样的线(此处硬编码到4),以及具有每个线程总和在其范围内的条目:

    cdef uint64_t num_threads = 4
    cdef uint64_t l = len(crit)
    cdef uint64_t s = l / num_threads + 1
    cdef uint64_t i, j, e
    cdef counts_vec_t counts
    counts = counts_vec_t(num_threads)
    counts.resize(num_threads)
    with cython.boundscheck(False):
        for i in prange(num_threads, nogil=True): 
            j = i * s
            e = j + s
            if e > l:
                e = l
            while j < e:
                counts[i][crit_view[j]] += vals_view[j]
                inc(j)

当线程完成,该函数合并所有的结果(来自不同范围)到单个unordered_map

    cdef counts_t total
    cdef counts_it_t it, e_it
    for i in range(num_threads):
        it = counts[i].begin()
        e_it = counts[i].end()
        while it != e_it:
            total[deref(it).first] += deref(it).second
            inc(it)        

剩下要做的事情是创建一个DataFrame和返回的结果:

    key, sum_ = [], []
    it = total.begin()
    e_it = total.end()
    while it != e_it:
        key.append(deref(it).first)
        sum_.append(deref(it).second)
        inc(it)

    df = pd.DataFrame({'key': key, 'sum': sum_})
    df.set_index('key', inplace=True)
    return df


文章来源: Efficiently applying a function to a grouped pandas DataFrame in parallel