Exception is lost while consuming a PLINQ query
Problem Description:
I observed a weird behavior while experimenting with a PLINQ query. Here is the scenario:
- There is a source
IEnumerable<int>
sequence that contains the two items 1 and 2. - A Parallel LINQ
Select
operation is applied on this sequence, projecting each item to itself (x => x
). - The resulting
ParallelQuery<int>
query is consumed immediately with aforeach
loop. - The
selector
lambda of theSelect
projects successfully the item 1. - The consuming
foreach
loop throws an exception for the item 1. - The
selector
lambda throws an exception for the item 2, after a small delay.
What happens next is that the consuming exception is lost! Apparently it is shadowed by the exception thrown afterwards in the Select
. Here is a minimal demonstration of this behavior:
ParallelQuery<int> query = Enumerable.Range(1, 2)
.AsParallel()
.Select(x =>
{
if (x == 2) { Thread.Sleep(500); throw new Exception($"Oops!"); }
return x;
});
try
{
foreach (int item in query)
{
Console.WriteLine($"Consuming item #{item} started");
throw new Exception($"Consuming item #{item} failed");
}
}
catch (AggregateException aex)
{
Console.WriteLine($"AggregateException ({aex.InnerExceptions.Count})");
foreach (Exception ex in aex.InnerExceptions)
Console.WriteLine($"- {ex.GetType().Name}: {ex.Message}");
}
catch (Exception ex)
{
Console.WriteLine($"{ex.GetType().Name}: {ex.Message}");
}
Output:
Consuming item #1 started
AggregateException (1)
- Exception: Oops!
Chronologically the consuming exception happens first, and the PLINQ exception happens later. So my understanding is that the consuming exception is more important, and it should be propagated with priority. Nevertheless the only exception that is surfaced is the one that occurs inside the PLINQ code.
My question is: why is the consuming exception lost, and is there any way that I can fix the query so that the consuming exception is propagated with priority?
The desirable output is this:
Consuming item #1 started
Exception: Consuming item #1 failed
Solution – 1
I think what you are seeing is the result of the compiler translation of the foreach
into a while (MoveNext())
with a try
/finally
to dispose of the enumerator. When the inner exception is thrown, it is caught by the finally and the Dispose()
of the enumerator causes all the Select
threads to finish, which causes an exception inside the finally
block, which throws away the initial exception as discussed here. You need to use your own loop and a try
/catch
if you want to prevent this, though I think the Microsoft recommendation would be to use a try
/catch
in the Select
to be closer to the source of the exception.
Here is a modification of your existing code replacing the foreach
with the compiler generated expansion of foreach
using an enumerator. (I use LINQPad to see the C# 1.0 equivalent code / IL code from the compiler.)
You can capture any exceptions during the Dispose
of the enumerator and then bundle them up with the original exception into an AggregateException
when you catch them.
I wrapped the boilerplate into an extension method to replace the normal foreach
:
var b = true;
var query = Enumerable.Range(1, 3)
.AsParallel()
.Select(x => {
Thread.Sleep(50 * (x - 1));
Console.WriteLine($"Select({x})");
if (x >= 2) {
throw new Exception($"Oops {x}!");
}
return x;
});
try {
query.ForEachAggregatingExceptions(item => {
Console.WriteLine($"Consuming item #{item} started");
if (b) {
throw new Exception($"Consuming item #{item} failed");
}
});
}
catch (AggregateException aex) {
Console.WriteLine($"AggregateException ({aex.InnerExceptions.Count})");
foreach (Exception ex in aex.InnerExceptions)
Console.WriteLine($"- {ex.GetType().Name}: {ex.Message}");
}
catch (Exception ex) {
Console.WriteLine($"{ex.GetType().Name}: {ex.Message}");
}
public static class ParallelQueryExt {
public static void ForEachAggregatingExceptions<T>(this ParallelQuery<T> pq, Action<T> processFn) {
Exception FirstException = null;
var e = pq.GetEnumerator();
try {
while (e.MoveNext())
processFn(e.Current);
}
catch (Exception ex) {
FirstException = ex;
}
finally {
if (e != null) {
try {
e.Dispose();
}
catch (AggregateException aex) { // combine exceptions from Dispose with FirstException if any
if (FirstException != null) {
throw new AggregateException(aex.InnerExceptions.Prepend(FirstException));
}
else
throw;
}
catch (Exception ex) { // combine single exception from Dispose with FirstException if any
throw new AggregateException(new[] { ex, FirstException });
}
if (FirstException != null) // re-throw FirstException if no others occurred
throw FirstException;
}
}
}
}
PS The b
variable and the if
prevents the compiler from optimizing out the while
loop into an if
since it can figure out the throw
will prevent the loop from executing more than once pass.
Solution – 2
NetMage’s answer explains that the observed behavior is caused by the error thrown on Dispose
of the PLINQ enumerator. My guess about why the PLINQ library violates the common wisdom about throwing exceptions on Dispose
, which is to avoid throwing unless the error is critical, is because the library was introduced on .NET 4.0. In this .NET version an unobserved faulted Task
resulted in the termination of the processes. The process was crashing when the faulted Task
was garbage collected, after raising the TaskScheduler.UnobservedTaskException
. So the PLINQ designers had to choose between throwing on Dispose
, swallowing the exception completely, or crashing the process, and they choose what seemed like the lesser evil of the available options. That’s my guess.
Had the library been authored on .NET 4.5, they might had decided differently. In that .NET version, the process would no longer crash when an unobserved faulted Task
was garbage collected. Reverting to the .NET 4.0 policy is still possible through a configuration setting, but I doubt that anyone ever used this setting to revert to the original irrational behavior.
My approach for fixing PLINQ’s error-losing behavior is a bit different that NetMage’s approach. Instead of bundling all errors in an AggregateException
, I prefer to suppress the exception that is thrown by PLINQ on Dispose
, and propagate it through the TaskScheduler.UnobservedTaskException
mechanism. This can be achieved easily by just creating a faulted task with the Task.FromException
method, and leaving it unobserved:
/// <summary>
/// Suppresses the error that might be thrown by the enumerator on Dispose.
/// The error triggers the TaskScheduler.UnobservedTaskException event.
/// </summary>
public static IEnumerable<TSource> SuppressDisposeException<TSource>(
this IEnumerable<TSource> source)
{
ArgumentNullException.ThrowIfNull(source);
IEnumerator<TSource> enumerator = source.GetEnumerator();
try
{
while (enumerator.MoveNext()) yield return enumerator.Current;
try { enumerator.Dispose(); } finally { enumerator = null; }
}
finally
{
try { enumerator?.Dispose(); }
catch (Exception ex) { _ = Task.FromException(ex); }
}
}
The finally
block of the iterator can be invoked either by the MoveNext
or the Dispose
of the autogenerated enumerator. In case it is invoked on MoveNext
, we want to propagated normally a source Dispose
exception. We only want to suppress it if it happens on the autogenerated Dispose
. That’s the reason for disposing the source enumerator preferentially in the try
block, and only as a fallback in the finally
block.
Usage example:
IEnumerable<int> query = Enumerable.Range(1, 2)
.AsParallel()
.Select(x => /* ... */ x)
.SuppressDisposeException();
In order to watch the TaskScheduler.UnobservedTaskException
event being triggered, you might have to call GC.Collect
as part of the test.
The justification for suppressing the exception on Dispose
from the synchronous execution flow, is because I consider the parallel nature of PLINQ as a form of speculative execution. The PLINQ engine might do more work than what the consumer of the query is interested to receive. So in case the consumer abandons the enumeration prematurely, either voluntarily by break
ing the foreach
loop, or unwillingly because it suffered an exception, the PLINQ should not bother the consumer with anything that might happen past the point that the consumer lost interest for the enumeration.