C# - Extensions: ParallelForEachAsync

Using await Task.WhenAll()

I have found that when using Task.WhenAll() on a large amount of tasks to execute asynchronously (and concurrently), it has a tendency to become unperformant, at least when dealing with a enough IO and without moderation/throttling. At the very least, this could trigger a sudden and possibly significant taxation on the ThreadPool. This can affect performance everywhere.

Parallel.ForEach Async

The problem with Parallel.ForEach is that you can only execute async () => {} which is an async void. They become really hard to predict behavior, exceptions, and to ensure execution (leaving the scope of the method prematurely).

The Stephen Toub Approach

My research lead me to using a partition with a limit on the degree of parallelism.

I have modified Stephen Toub's blog post on his elegant approach to modern functional Linq.

public static Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> funcBody, int maxDoP = 4)
{
    async Task AwaitPartition(IEnumerator<T> partition)
    {
        using (partition)
        {
            while (partition.MoveNext())
            { await funcBody(partition.Current); }
        }
    }

    return Task.WhenAll(
        Partitioner
            .Create(source)
            .GetPartitions(maxDoP)
            .AsParallel()
            .Select(p => AwaitPartition(p)));
}

This is highly performant and functioning working ParallelForEachAsync extension on IEnumerables. It uses a local function for performance versus using an async Lambda and to also chain execution to the calling Task.

The logic behind it is quite simple but did take me a second to digest it all. Take an IEnumerable (source) and dissect it into evenish partitions. Then in Parallel, fire a func/method against each element, in each partition. No more than one element in each partition at anyone time, but X Tasks in N partitions, where N is the max degress of parallelism.

Example Usage: await myList.ParallelForEachAsync(myFunc, Environment.ProcessorCount);

Additional Information

This is part of the HouseofCat.Library.Extensions.IEnumerableExtensions! It has a few variations to allow more objects passed in during the iteration.