09.03.2010, 15:12 Uhr
Parallele Programmierung mit PLINQ und .NET 4.0
Mit Hilfe der bei .NET 4.0 integrierten Task Parallel Library und deren Erweiterungsmethoden lassen sich auch LINQ-Abfragen einfach parallel ausführen. Gerade in diesem Bereich dürfte sich der Umstand, dass eine Abfrage nicht mehr nur auf einem, sondern auf zwei oder mehr Kernen gleichzeitig ausgeführt wird, vorteilhaft auswirken.
von Marc André ZhouDie Abfragesprache Language Integrated Query (LINQ) war eine der wichtigsten Neuerungen bei .NET 3.5. LINQ bietet eine mächtige Abfragesyntax, die direkt in den Quellcode eingegeben wird, und mit der sich beliebige Collections typisiert abfragen lassen. Mit .NET 4.0 wird LINQ um die Möglichkeit erweitert, nahezu jede LINQ-Abfrage auch parallel auszuführen zu können. Die Grundlage ist die PLINQ Execution Engine (Abbildung 1), die auf der ebenfalls mit .NET 4.0 neu eingeführten Task Parallel Library basiert.PLINQ besteht aus vier Teilen:1) Query Analysis: Analyse der Abfrage2) Data Partitionierung: Algorithmen zur Aufteilung der Datenmenge3) Operator Type: Abfrage-Operatoren4) Merging: Zusammenführung der ErgebnisseDas PLINQ-Programmiermodell ist relativ simpel, die Syntax von LINQ bleibt dabei vollständig erhalten. PLINQ basiert wie LINQ auf Erweiterungsmethoden (Extension Methods), die von der IEnumerable-Schnittstelle stammen. PLINQ bringt mit ParallelEnumerable eine neue Schnittstelle mit, welche die bereits vorhandenen LINQ-Erweiterungsmethoden noch einmal erweitert.Listing 1 veranschaulicht den Umgang mit der AsParallel-Erweiterungsmethode.Listing 1: Paralleles LINQ mit der AsParallel-Methodestatic void ListCustomers(){ var Customers = GetCustomers(); var Orders = GetOrders(); var result = from customer in Customers.AsParallel() join order in Orders.AsParallel() on customer.CustomerID equals order.CustomerIDselect customer;foreach (var item in result)Console.WriteLine("Customer: {0}", item.CustomerName); }
Abbildung 1: Die PLINQ Execution Engine führt LINQ-Abfragen parallel aus
Da die Schnittstelle ParallelEnumerable alle LINQ-Erweiterungsmethoden implementiert, ist keine weitere Anpassung erforderlich und eine bislang sequentiell ausgeführte LINQ-Abfrage wird parallel ausgeführt. Auch wenn das Beispiel in Listing 1 anschaulich demonstriert, dass der Umgang mit PLINQ sehr einfach sein kann, müssen einige Punkte besonders beachtet werden.Fehlerbehandlung mit AggregateExceptionDie gleichzeitige Ausführung von Programmcode bedeutet auch, dass zeitgleich mehrere Fehler auftreten können. Um diese behandeln zu können, werden alle Fehler von der Ausführungsumgebung gesammelt und einer AggregateException hinzugefügt, die entsprechend behandelt werden muss. Listing 2 zeigt, wie im Rahmen einer Fehlerbehandlung auf alle über diese Ausnahme zur Verfügung gestellten aufgetretenen Fehler zugegriffen wird. Bevor mit der Fehlerbehandlung begonnen werden kann, wird die Methode Flatten aufgerufen. Sie extrahiert unter anderem verschachtelte Fehler, da eine AggregateExcetion selbst wieder AggregateExceptions enthalten könnte. Die Methode Handle wird für jede enthaltene Exception aufgerufen. Kann die Exception behandelt werden, gibt die eigene Fehlerbehandlungsroutine true zurück, ansonsten false. Nicht behandelbare Fehler werden erneut ausgelöst und müssen weiter oben im Aufrufstapel behandelt werden. Listing 2: Fehlerbehandlung bei einer parallelen Abfragestatic void ExceptionHandling() {try {var result = Enumerable.Range(1, 200).ToArray().AsParallel() .Select(i => { if (i > 51) throw new Exception("Unknown Error"); if (i > 50) throw new IndexOutOfRangeException("Zahl ist zu hoch: " + i); return i; }).ToArray(); }catch (AggregateException age){ age.Flatten().Handle(exc => { if (exc is IndexOutOfRangeException) { Console.WriteLine(exc.Message); return true; } return false; }); } }
PLINQ-Abfragen können nicht nur unfreiwillig durch Exceptions zum Abbruch gezwungen werden, oft besteht die Notwendigkeit, eine PLINQ-Abfrage im Vorfeld kontrolliert abzubrechen. Für diesen Zweck kann das unter .NET 4.0 neu eingeführte Cancellation-Framework verwendet werden. Es besteht im Wesentlichen aus den zwei Klassen CancellationTokenSource und CancellationToken. Um einen Abbruch zu erreichen, müssen jeweils beide Klassen verwendet werden. Initial wird ein Abbruch über eine Instanz der Klasse CancellationTokenSource durch den Aufruf der Methode Cancel ausgelöst. Das Abbruchsignal wird über das zugehörige CancellationToken weitergegeben. Damit eine PLINQ-Abfrage über einen Abbruchwunsch informiert werden kann, muss der Abfrage über die Methode WithCancellation ein CancellationToken übergeben werden. Listing 3 zeigt ein Beispiel für den Einsatz des Cancellation-Frameworks. Zunächst wird eine Abfrage definiert, dieser wird über die Methode WithCancellation ein CancellationToken übergeben. Danach startet ein Task, der nach 1 Sekunde Wartezeit die Methode Cancel der Instanz CancellationTokenSource aufruft. Über das zugehörige CancellationToken wird der Abbruchwunsch der PLINQ-Abfrage mitgeteilt und die Ausführung der Abfrage terminiert.Listing 3: Abbruch einer PLINQ-Abfragestatic CancellationTokenSource cts = new CancellationTokenSource();
static void CancellationDemoSimple() {var Customers = GetCustomers(); var Orders = GetOrders(); var result = from customer in Customers.AsParallel().WithCancellation(cts.Token) join order in Orders.AsParallel() on customer.CustomerID equals order.CustomerID select customer; Task.Factory.StartNew(() => {Thread.CurrentThread.Join(1000); cts.Cancel(); });foreach (var item in result)Console.WriteLine(item.CustomerName);}
Zeitnahes BeendenWerden innerhalb einer PLINQ-Abfrage langwierige bzw. rechenintensive Delegates aufgerufen, ist ein zeitnaher Abbruch der Abfrage nur kooperativ möglich. Das Beispiel in Listing 4 verdeutlicht dies. Innerhalb der Abfrage wird in der Select-Anweisung eine zeitintensive Operation ausgeführt, die 5 Sekunden zur Ausführung benötigt. Die PLINQ-Abfrage soll aber bereits nach 2 Sekunden abgebrochen werden. Der Abbruch geschieht bei diesem Beispiel jedoch erst nach über 6 Sekunden. Diese Verzögerung resultiert aus dem Umstand, dass PLINQ die Ausführungskontrolle dem User-Delegate übergeben hat und daher nicht selber das CancellationToken abfragen kann. Dies ist erst wieder möglich, nachdem der User-Delegate beendet wurde. Um diese Latenz zu vermeiden, muss der CancellationToken auch innerhalb des User-Delegates überwacht werden. Das CanecallationToken besitzt dafür die Methode ThrowIfCancellationRequested (in Listing 4 ist der Aufruf auskommentiert). Wird der Methodenaufruf aktiviert, endet die Abfrage wie gewünscht zeitnah nach ca. 2 Sekunden.Listing 4: Zeitnahes Beenden einer parallel ausgeführten Methodestatic CancellationTokenSource cts = new CancellationTokenSource();static void CancellationLongRunning(){Task.Factory.StartNew(() => {Thread.CurrentThread.Join(2000); cts.Cancel(); }); var result = Enumerable.Range(0, 1000).AsParallel() .WithCancellation(cts.Token) .Select(i => {for (int j = 0; j < 10; j++) { //cts.Token.ThrowIfCancellationRequested(); Thread.CurrentThread.Join(500); } return i; }); try { foreach (int i in result) Console.WriteLine("Figure: {0}", i); } catch (Exception) { Console.WriteLine("Canceled"); } }
Kosten-Nutzen-AnalyseWird eine LINQ-Abfrage durch AsParallel zu einer parallelen Abfrage, bedeutet dies aber nicht zwingend, dass eine parallele Ausführung stattfindet. PLINQ durchläuft vor der Ausführung eine ,,Query Analysis"-Phase. In dieser Phase prüft PLINQ, ob eine parallele Ausführung für die aktuelle Abfrage überhaupt eine verbesserte Laufzeitgeschwindigkeit erzielen würde. Endet diese Prüfung negativ, wird die Abfrage weiterhin sequentiell ausgeführt. Basis der Entscheidung ist nicht, was naheliegend wäre, die Datenmenge, sondern die Struktur der Abfrage. Verwendet die Abfrage z.B. Operatoren wie Take, SkipWhile, Group by, wird die Abfrage mit höherer Wahrscheinlichkeit sequentiell und nicht parallel ausgeführt. Wenn es sein muss, kann PLINQ über die WithExectionMode mit der Einstellung ParallelExecutionMode.ForceParallelism zu einer parallelen Ausführung gezwungen werden. Ein weiterer wichtiger Punkt, der in diesem Zusammenhang beachtet werden muss, betrifft die Reihenfolge der Ausgabeelemente. Bei einer rein sequentiell ausgeführten LINQ-Abfrage bleibt die Reihenfolge von Ein- und Ausgabeelemente gleich. Das erste Element einer Datenquelle wird als erstes verarbeitet und zurückgegeben. Bei PLINQ findet eine parallele Ausführung statt, daher kann die Reihenfolge, in der die einzelnen Elemente verarbeitet werden, im Vorfeld auch nicht vorhergesagt werden. Mit Hilfe der Methode AsOrdered bleibt die ursprüngliche Reihenfolge der Elemente bei der Ausgabe erhalten. Der Nachteil: Darunter leidet die Verarbeitungsgeschwindigkeit, da die Elemente vor der Ausgabe wieder korrekt eingeordnet werden müssen. Parallele Programmierung bedeutet daher iauch, Kompromisse zu schließen.Über den AutorMarc André Zhou ist Dipl. Inf. (FH) und erwarb zusätzlich den Titel Master of Science. Derzeit ist er für die Logica Deutschland GmbH & Co. KG als Senior Consultant tätig. Weitere Informationen finden Sie auf seinem Blog unter http://www.sw-consultant.de. Peter Monadiemi