Remove empty partitions in Dask

2020-07-18 09:46发布

问题:

When loading data from CSV some CSVs cannot be loaded, resulting in an empty partition. I would like to remove all empty partitions, as some methods seem to not work well with empty partitions. I have tried to repartition, where (for example) repartition(npartitions=10) works, but a value greater than this can still result in empty partitions.

What's the best way of achieving this? Thanks.

回答1:

I've found that filtering a Dask dataframe, e.g., by date, often results in empty partitions. If you're having trouble using a dataframe with empty partitions, here's a function, based on MRocklin's guidance, to cull them:

def cull_empty_partitions(df):
    ll = list(df.map_partitions(len).compute())
    df_delayed = df.to_delayed()
    df_delayed_new = list()
    pempty = None
    for ix, n in enumerate(ll):
        if 0 == n:
            pempty = df.get_partition(ix)
        else:
            df_delayed_new.append(df_delayed[ix])
    if pempty is not None:
        df = dd.from_delayed(df_delayed_new, meta=pempty)
    return df


回答2:

For anyone working with Bags (not DataFrames), this function will do the trick:

def cull_empty_partitions(bag):
    """
    When bags are created by filtering or grouping from a different bag,
    it retains the original bag's partition count, even if a lot of the
    partitions become empty.
    Those extra partitions add overhead, so it's nice to discard them.
    This function drops the empty partitions.
    """
    bag = bag.persist()
    def get_len(partition):
        # If the bag is the result of bag.filter(),
        # then each partition is actually a 'filter' object,
        # which has no __len__.
        # In that case, we must convert it to a list first.
        if hasattr(partition, '__len__'):
            return len(partition)
        return len(list(partition))
    partition_lengths = bag.map_partitions(get_len).compute()

    # Convert bag partitions into a list of 'delayed' objects
    lengths_and_partitions = zip(partition_lengths, bag.to_delayed())

    # Drop the ones with empty partitions
    partitions = (p for l,p in lengths_and_partitions if l > 0)

    # Convert from list of delayed objects back into a Bag.
    return dask.bag.from_delayed(partitions)


回答3:

There is no simple API to do this. You could call df.map_partitions(len) to determine which partitions are empty and then remove them explicitly, perhaps by using df.to_delayed() and dask.dataframe.from_delayed(...).

In the future if you are willing to raise issues when you find a function that does not work well with empty partitions that would be greatly appreciated. https://github.com/dask/dask/issues/new



标签: python dask