Aggregate a Dask dataframe and produce a dataframe

2020-04-08 12:36发布

I have a Dask dataframe that looks like this:

url     referrer    session_id ts                  customer
url1    ref1        xxx        2017-09-15 00:00:00 a.com
url2    ref2        yyy        2017-09-15 00:00:00 a.com
url2    ref3        yyy        2017-09-15 00:00:00 a.com
url1    ref1        xxx        2017-09-15 01:00:00 a.com
url2    ref2        yyy        2017-09-15 01:00:00 a.com

I want to group the data on url and timestamp, aggregate column values and produce a dataframe that would look like this instead:

customer url    ts                  page_views visitors referrers
a.com    url1   2017-09-15 00:00:00 1          1        [ref1]
a.com    url2   2017-09-15 00:00:00 2          2        [ref2, ref3]

In Spark SQL, I can do this as follows:

select 
    customer,
    url,
    ts,
    count(*) as page_views,
    count(distinct(session_id)) as visitors,
    collect_list(referrer) as referrers
from df
group by customer, url, ts

Is there any way I can do it with Dask dataframes? I tried, but I can only calculate the aggregated columns separately, as follows:

# group on timestamp (rounded) and url
grouped = df.groupby(['ts', 'url'])

# calculate page views (count rows in each group)
page_views = grouped.size()

# collect a list of referrer strings per group
referrers = grouped['referrer'].apply(list, meta=('referrers', 'f8'))

# count unique visitors (session ids)
visitors = grouped['session_id'].count()

But I can't seem to find a good way to produce a combined dataframe that I need.

2条回答
够拽才男人
2楼-- · 2020-04-08 12:59

The following does indeed work:

gb = df.groupby(['customer', 'url', 'ts'])
gb.apply(lambda d: pd.DataFrame({'views': len(d), 
     'visitiors': d.session_id.count(), 
     'referrers': [d.referer.tolist()]})).reset_index()

(assuming visitors should be unique as per the sql above) You may wish to define the meta of the output.

查看更多
Emotional °昔
3楼-- · 2020-04-08 13:05

This is the link to the github issue that @j-bennet opened that gives an additional option. Based on the issue we implemented the aggregation as follows:
custom_agg = dd.Aggregation( 'custom_agg', lambda s: s.apply(set), lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))), ).
In order to combine with the count the code is as follows
dfgp = df.groupby(['ID1','ID2']) df2 = dfgp.assign(cnt=dfgp.size()).agg(custom_agg).reset_index()

查看更多
登录 后发表回答