I have a Dask dataframe that looks like this:
url referrer session_id ts customer
url1 ref1 xxx 2017-09-15 00:00:00 a.com
url2 ref2 yyy 2017-09-15 00:00:00 a.com
url2 ref3 yyy 2017-09-15 00:00:00 a.com
url1 ref1 xxx 2017-09-15 01:00:00 a.com
url2 ref2 yyy 2017-09-15 01:00:00 a.com
I want to group the data on url and timestamp, aggregate column values and produce a dataframe that would look like this instead:
customer url ts page_views visitors referrers
a.com url1 2017-09-15 00:00:00 1 1 [ref1]
a.com url2 2017-09-15 00:00:00 2 2 [ref2, ref3]
In Spark SQL, I can do this as follows:
select
customer,
url,
ts,
count(*) as page_views,
count(distinct(session_id)) as visitors,
collect_list(referrer) as referrers
from df
group by customer, url, ts
Is there any way I can do it with Dask dataframes? I tried, but I can only calculate the aggregated columns separately, as follows:
# group on timestamp (rounded) and url
grouped = df.groupby(['ts', 'url'])
# calculate page views (count rows in each group)
page_views = grouped.size()
# collect a list of referrer strings per group
referrers = grouped['referrer'].apply(list, meta=('referrers', 'f8'))
# count unique visitors (session ids)
visitors = grouped['session_id'].count()
But I can't seem to find a good way to produce a combined dataframe that I need.
The following does indeed work:
(assuming visitors should be unique as per the sql above) You may wish to define the
meta
of the output.This is the link to the github issue that @j-bennet opened that gives an additional option. Based on the issue we implemented the aggregation as follows:
custom_agg = dd.Aggregation( 'custom_agg', lambda s: s.apply(set), lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))), )
.In order to combine with the count the code is as follows
dfgp = df.groupby(['ID1','ID2']) df2 = dfgp.assign(cnt=dfgp.size()).agg(custom_agg).reset_index()