When loading data from CSV some CSVs cannot be loaded, resulting in an empty partition. I would like to remove all empty partitions, as some methods seem to not work well with empty partitions. I have tried to repartition, where (for example) repartition(npartitions=10)
works, but a value greater than this can still result in empty partitions.
What's the best way of achieving this? Thanks.
I've found that filtering a Dask dataframe, e.g., by date, often results in empty partitions. If you're having trouble using a dataframe with empty partitions, here's a function, based on MRocklin's guidance, to cull them:
def cull_empty_partitions(df):
ll = list(df.map_partitions(len).compute())
df_delayed = df.to_delayed()
df_delayed_new = list()
pempty = None
for ix, n in enumerate(ll):
if 0 == n:
pempty = df.get_partition(ix)
else:
df_delayed_new.append(df_delayed[ix])
if pempty is not None:
df = dd.from_delayed(df_delayed_new, meta=pempty)
return df
For anyone working with Bags (not DataFrames), this function will do the trick:
def cull_empty_partitions(bag):
"""
When bags are created by filtering or grouping from a different bag,
it retains the original bag's partition count, even if a lot of the
partitions become empty.
Those extra partitions add overhead, so it's nice to discard them.
This function drops the empty partitions.
"""
bag = bag.persist()
def get_len(partition):
# If the bag is the result of bag.filter(),
# then each partition is actually a 'filter' object,
# which has no __len__.
# In that case, we must convert it to a list first.
if hasattr(partition, '__len__'):
return len(partition)
return len(list(partition))
partition_lengths = bag.map_partitions(get_len).compute()
# Convert bag partitions into a list of 'delayed' objects
lengths_and_partitions = zip(partition_lengths, bag.to_delayed())
# Drop the ones with empty partitions
partitions = (p for l,p in lengths_and_partitions if l > 0)
# Convert from list of delayed objects back into a Bag.
return dask.bag.from_delayed(partitions)
There is no simple API to do this. You could call df.map_partitions(len)
to determine which partitions are empty and then remove them explicitly, perhaps by using df.to_delayed()
and dask.dataframe.from_delayed(...)
.
In the future if you are willing to raise issues when you find a function that does not work well with empty partitions that would be greatly appreciated. https://github.com/dask/dask/issues/new