Chapter 19
Parallel Programming Using Tasks and Threads
What's in this chapter?
Understanding the task-based programming model, the Task Parallel Library, and its related hardware technologies
Launching, controlling, managing, and synchronizing parallel tasks
Refactoring loops to run them in parallel using Parallel.For and Parallel.ForEach
Transforming existing sequential code into parallelized code
Measuring the speed gain and the scalability offered by parallelized code
Working with different degrees of parallelism
Understanding the advantages of working with concurrent collections
Implementing a simple parallel producer-consumer pattern
Parallelizing LINQ queries using PLINQ
In the last few years, multicore technology has become the mainstream in CPU designs, and microprocessor manufacturers continue to improve their processing power. However, the shift to multicore is an inflexion point for software design philosophy.
This chapter is about the lightweight concurrency model introduced by Visual Basic 2010 with .NET Framework 4 and extended in Visual Basic 2012 with .NET Framework 4.5. A comprehensive treatment of the challenges offered by the new multicore designs could easily fill 600 pages or more, so this chapter attempts to strike a reasonable balance between detail and succinctness.
The top diagram represents an almost ideal situation: the four subroutines running in parallel. It is very important to consider the necessary time to schedule the concurrent tasks, which adds an initial overhead to the overall time.
The middle diagram shows a scenario with just two concurrent lanes and four subroutines to run. On one lane, once ConvertEllipses finishes, ConvertRectangles starts. On the other lane, once ConvertLines finishes, ConvertText starts. Parallel.Invoke takes more time than the previous scenario to run all the subroutines.
The bottom diagram shows another scenario with three concurrent lanes. However, it takes almost the same time as the middle scenario, because in this case the ConvertLines subroutine takes more time to run. Thus, Parallel.Invoke takes almost the same time as the previous scenario to run all the subroutines even using one additional parallel lane.
The key advantage of using Parallel.Invoke is its simplicity; you can run many subroutines in parallel without having to worry about tasks or threads. However, it isn't suitable for all the situations in which it is possible to take advantage of parallel execution. Parallel.Invoke has many trade-offs, including the following:
The previously explained example provides a good transition to the differences between parallelism and concurrency, because they aren't the same thing, as shown in .
Concurrency means that different parts of code can start, run, and complete in overlapping time periods. Concurrency can happen even on computers with a single logical core. When many parts of code run concurrently on a computer with a single logical core, time-slicing mechanisms and fast context switches can offer the impression of parallel execution. However, on this hardware, it requires more time to run many parts of code concurrently than to run a single part of code alone, because the concurrent code is competing for hardware resources (refer to ). You can think of concurrency as many cars sharing a single lane. This is why concurrency is also defined as a form of virtual parallelism, but it isn't real parallelism.
Parallelism means that different parts of code can actually run simultaneously, i.e., at the same time, taking advantage of real parallel processing capabilities found in the underlying hardware. Parallelism isn't possible on computers with a single logical core. You need at least two logical cores in order to run parallel code. When many parts of code run in parallel on a computer with multiple logical cores, time-slicing mechanisms and context switches also occur because typically many other parts of code are trying to use processor time. However, when real parallelism occurs, you can achieve speed gains because many parts of code running in parallel can reduce the overall necessary time to complete certain algorithms. The diagram shown in offers two possible parallelism scenarios:
GenerateAESKeys and GenerateMD5Hashes need approximately 14 seconds to run. The first one takes 6 seconds and the latter 8 seconds. Of course, these times will vary considerably according to the underlying hardware configuration.
There is no interaction between these two subroutines. Thus, they are completely independent from each other. As the subroutines run one after the other, in a sequential way, they aren't taking advantage of the parallel processing capabilities offered by the additional core(s). Therefore, these two subroutines represent a clear hotspot where parallelism could help to achieve a significant speedup over sequential execution. For example, it is possible to run both subroutines in parallel using Parallel.Invoke.
Replace the Main subroutine shown in the simple console application with the following new version, launching both GenerateAESKeys and GenerateMD5Hashes in parallel, using Parallel.Invoke (code file: Snippet02.sln):
Sub Main() Dim sw = Stopwatch.StartNew() Parallel.Invoke(Sub() GenerateAESKeys(), Sub() GenerateMD5Hashes()) Console.WriteLine(sw.Elapsed.ToString()) ' Display the results and wait for the user to press a key Console.ReadLine() End Sub
shows the parallel execution flow for the new version of this application and the time it takes to run each of the two subroutines in a specific computer with a dual-core microprocessor.
Now, GenerateAESKeys and GenerateMD5Hashes need approximately nine seconds to run because they take advantage of both cores offered by the microprocessor. Thus, it is possible to calculate the speedup achieved using the following formula:
Speedup = (Serial execution time)/(Parallel execution time)
In the preceding example, 14/9 = 1.56 times faster, usually expressed as a 1.56x speedup over the sequential version. GenerateAESKeys takes more time than GenerateMD5Hashes to run, nine seconds versus six seconds. However, Parallel.Invoke doesn't continue with the next line until all the delegates finish their execution. Therefore, during approximately three seconds, the application is not taking advantage of one of the cores, as shown in .
In addition, if this application runs on a computer with a quad-core microprocessor, its speedup over the sequential version would be nearly the same, as it won't scale to take advantage of the two additional cores found in the underlying hardware.
In this section, you saw how it is possible to detect hotspots by adding some code to measure the elapsed time to run certain methods. By changing just a few lines of code, a noticeable improvement in speed was achieved. Now it is time to learn other TPL structures that can help to achieve better results and offer improved scalability when the number of available cores increases.
Now, uncomment the lines that send output to the console in both GenerateAESKeys and GenerateMD5Hashes:
Console.WriteLine(ConvertToHexString(result))
Writing to the console will generate a bottleneck for the parallel execution. However, this time, there is no need to measure accurate times. Instead, you can view the output to determine that both methods are running in parallel. The following lines show a sample console output generated by this application. The highlighted lines, the shorter hexadecimal strings, correspond to the MD5 hashes. The others represent AES keys. Each AES key takes less time to generate than each MD5 hash. Remember that the code creates 800,000 AES keys (NUM_AES_KEYS) and 900,000 MD5 hashes (NUM_MD5_HASHES). Depending on your environment, the code might take a lot of time to complete the execution.
0364DBC9A8FA3EAC793FC53AAE6D0193484087634C3033C470D96C72F89D7254 E410BCB82B36729CB7CCCCDFE30746F2DF141CC8275790360E2ED731F8C7113D 66CF85EA8FC77746A7C4A116F68D802D7167AE9E7C5FB0B6B85D44B8929386DE 0421897DCF492380BADF872205AE32D94632C60022A4E965652524D7023C59AD C3BEF1DFFF5A9CAB11BFF8EA3F7DEFC97D91562A358DB56477AD445ACB4F1DE3 AF521D65489CA5C69517E32E652D464676E5F2487E438124DBF9ACF4157301AA A641EB67C88A29985CFB0B2097B12CFB9296B4659E0949F20271984A3868E0B3 D7A05587DFDFD0C49BEF613F2EB78A43 90BF115C60B2DECA60C237F3D06E42EE B3519CBA0137FD814C09371836F90322 1415C19F7F93306D35186721AF6B8DDE56427BB9AF29D22E37B34CB49E96BB49 208B73D3E6468F48B950E5F5006DDF30FE7A1B3BCC46489F7722BD98D54079D7 ACD0312DFF1BF29ECA2721DAFA9B20AB5FBDBD20E76C150C5CCE4026990C9D26 EB68C902145439F2A66514B9D89E9A958F18EE15D491014D3DCB312781F277D1 9DB8ABF087C78091F1E77AC769FF175A F3EFB2804A969D890AFABCE17E84B26E B342A8A253003754B752B85C67DA1560F30CD36A1AA759A0010E1F8E5045CBB5 9681656DC08F29AB1911A1CCCFBE6B468D1DF7B9D8722324E5E2BB4A314EC649 7DE56E111213655F54D6F8656238CA5E 196D194BA2B786EADD1B6852645C67C5 BA7AC6B878064E98D98336CA5DE45DEC 875DAB451CCE3B5FBD8E5091BAD1A8ED7DB2FF8C9E3EEA834C6DEA7C2467F27E C1AA2CB88AB669317CB90CD842BF01DB26C6A655D10660AF01C37ECC7AEDA267 66E1F4F56E04FC9BFF225F68008A129D93F9B277ADAB43FF764FB87FFD098B78
Now, comment the lines that send output to the console in both GenerateAESKeys and GenerateMD5Hashes again.
Sometimes, refactoring an existing For loop, as previously explained, can be a very complex task, and the changes to the code could generate too much overhead for each iteration, reducing the overall performance. Another useful alternative is to partition all the data to be processed into parts that can be run as smaller loops in parallel, defining a custom partitioner, a tailored mechanism to split the input data into specific pieces that overrides the default partitioning mechanism. It is possible to use a Parallel.ForEach loop with a custom partitioner in order to create new versions of the sequential loops with a simpler refactoring process.
The code in file Listing05.sln shows the new code with the refactored loops using the imperative syntax to implement data parallelism offered by Parallel.ForEach, combined with a sequential For loop and a custom partitioner created with System.Collections.Concurrent.Partitioner. The new methods, ParallelPartitionGenerateAESKeys and ParallelPartitionGenerateMD5Hashes, also try to take advantage of all the cores available, relying on the work done under the hood by Parallel.ForEach and the range partitioning performed to distribute smaller sequential loops inside as many parallel loops as available cores. The code also optimizes its behavior according to the existing hardware at run time.
The code uses another important namespace for TPL, the new System.Collections.Concurrent namespace. This namespace offers access to useful collections prepared for concurrency and custom partitioners introduced for the first time in .NET Framework 4. Therefore, it is a good idea to import this namespace by using Imports System.Collections.Concurrent to work with the new examples (code file: Listing05.sln):
Sub ParallelPartitionGenerateAESKeys() Dim sw = Stopwatch.StartNew() Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), Sub(range) Dim aesM As New AesManaged() Debug.WriteLine("Range ({0}, {1}. Time: {2})", range.Item1, range.Item2, Now().TimeOfDay) For i As Integer = range.Item1 To range.Item2 - 1 aesM.GenerateKey() Dim result = aesM.Key Dim hexString = ConvertToHexString(result) ' Console.WriteLine("AES: " + ' ConvertToHexString(result)) Next End Sub) Console.WriteLine("AES: " + sw.Elapsed.ToString()) End Sub Sub ParallelPartitionGenerateMD5Hashes() Dim sw = Stopwatch.StartNew() Parallel.ForEach(Partitioner.Create(1, NUM_MD5_HASHES + 1), Sub(range) Dim md5M As MD5 = MD5.Create() For i As Integer = range.Item1 To range.Item2 - 1 Dim data = Encoding.Unicode.GetBytes(i.ToString()) Dim result = md5M.ComputeHash(data) Dim hexString = ConvertToHexString(result) ' Console.WriteLine(ConvertToHexString(result)) Next End Sub) Console.WriteLine("MD5: " + sw.Elapsed.ToString()) End Sub
The class function Parallel.ForEach offers 20 overrides. The definition used in this code file has the following parameters:
In addition, Parallel.ForEach can return a ParallelLoopResult value. The information offered in this structure is covered in detail later in this chapter.
The code in file Listing03.sln showed the original GenerateAESKey subroutine with the sequential For loop. The highlighted lines of code shown in file Listing05.sln represent the same sequential For loop. The only line that changes is the For definition, which takes into account the lower bound and the upper bound of the partition assigned by range.Item1 and range.Item2:
For i As Integer = range.Item1 To range.Item2 - 1
In this case, it is easier to refactor the sequential loop because there is no need to move local variables. The only difference is that instead of working with the entire source data, it splits it into many independent and potentially parallel partitions. Each one works with a sequential inner loop.
The following call to the Partitioner.Create method defines the partitions as the first parameter for Parallel.ForEach:
Partitioner.Create(1, NUM_AES_KEYS + 1)
This line splits the range from 1 to NUM_AES_KEYS into many partitions with an upper bound and a lower bound, creating a Tuple(Of Integer, Integer). However, it doesn't specify the number of partitions to create. ParallelPartitionGenerateAESKeys includes a line to write the lower and upper bounds of each generated partition and the actual time when it starts to run the sequential loop for this range.
Debug.WriteLine("Range ({0}, {1}. Time: {2})", range.Item1, range.Item2, Now().TimeOfDay)
Replace the Main subroutine with the following new version, launching first ParallelPartitionGenerateAESKeys and then ParallelParallelGenerateMD5Hashes (code file: Listing05.sln):
Sub Main() Dim sw = Stopwatch.StartNew() ParallelPartitionGenerateAESKeys() ParallelPartitionGenerateMD5Hashes() Console.WriteLine(sw.Elapsed.ToString()) ' Display the results and wait for the user to press a key Console.ReadLine() End Sub
As shown in the following lines, the partitioner creates 13 ranges. Thus, the Parallel.ForEach will run 13 sequential inner For loops with ranges. However, they don't start at the same time, because that wouldn't be a good idea with four cores available. The parallelized loop tries to load-balance the execution, taking into account the available hardware resources. The highlighted line shows the complexity added by both parallelism and concurrency. If you take into account the time, the first partition that reaches the sequential inner For loop is (66667, 133333) and not (1, 66667). Remember that the upper bound values shown in the following output are exclusive.
Range (133333, 199999. Time: 15:45:38.2205775) Range (66667, 133333. Time: 15:45:38.2049775) Range (266665, 333331. Time: 15:45:38.2361775) Range (199999, 266665. Time: 15:45:38.2205775) Range (1, 66667. Time: 15:45:38.2205775) Range (333331, 399997. Time: 15:45:39.0317789) Range (399997, 466663. Time: 15:45:39.0317789) Range (466663, 533329. Time: 15:45:39.1097790) Range (533329, 599995. Time: 15:45:39.2345793) Range (599995, 666661. Time: 15:45:39.3281794) Range (666661, 733327. Time: 15:45:39.9365805) Range (733327, 799993. Time: 15:45:40.0145806) Range (799993, 800001. Time: 15:45:40.1705809)
In addition, the order in which the data appears in the debug output is different because there are many concurrent calls to WriteLine. In fact, when measuring speedups, it is very important to comment these lines before the loop begins, because they affect the overall time by generating a bottleneck.
This new version using Parallel.ForEach with custom partitions needs approximately the same time as the previous Parallel.For version to run.
It is possible to tune the generated partitions in order to match them with the number of logical cores found at run time. System.Environment.ProcessorCount offers the number of logical cores or logical processors detected by the operating system. Hence, it is possible to use this value to calculate the desired range size for each partition and use it as a third parameter for the call to Partitioner.Create, using the following formula:
((numberOfElements / numberOfLogicalCores) + 1)
ParallelPartitionGenerateAESKeys can use the following code to create the partitions:
Partitioner.Create(0, NUM_AES_KEYS, (CInt(NUM_AES_KEYS / Environment.ProcessorCount) + 1))
A very similar line can also help to improve ParallelPartitionGenerateMD5Hashes:
Partitioner.Create(1, NUM_MD5_HASHES, (CInt(NUM_MD5_HASHES / Environment.ProcessorCount) + 1))
As shown in the following lines, now the partitioner creates four ranges because the desired range size is CInt((800000 / 4) + 1) = 200001. Thus, the Parallel.ForEach will run four sequential inner For loops with ranges, according to the number of available logical cores.
Range (1, 200002. Time: 16:32:51.3754528) Range (600004, 800000. Time: 16:32:51.3754528) Range (400003, 600004. Time: 16:32:51.3754528) Range (200002, 400003. Time: 16:32:51.3754528)
Now, ParallelPartitionGenerateAESKeys and ParallelPartitionGenerateMD5Hashes need approximately 3.40 seconds to run, because each one generates as many partitions as cores available and uses a sequential loop in each delegate; therefore, it reduces the previously added overhead. Thus, the speedup achieved is 11 / 3.4 = 3.23x over the sequential version. The reduced overhead makes it possible to reduce the time from 4.1 seconds to 3.4 seconds.
The diagram shown in represents one of the possible execution flows with the numbers for the lower and upper bounds for each partition, taking advantage of the four cores with the optimized partitioning scheme.
Parallel.ForEach is also useful to refactor existing For Each loops that iterate over a collection that exposes an IEnumerable interface. The simplest definition of the class function Parallel.ForEach, used in the following code (code file: Listing08.sln) to generate a new version of the MD5 hashes generation subroutine, ParallelForEachGenerateMD5Hashes, has the following parameters:
Private Function GenerateMD5InputData() As IEnumerable(Of Integer) Return Enumerable.Range(1, NUM_AES_KEYS) End Function Sub ParallelForEachGenerateMD5Hashes() Dim sw = Stopwatch.StartNew() Dim inputData = GenerateMD5InputData() Parallel.ForEach(inputData, Sub(number As Integer) Dim md5M As MD5 = MD5.Create() Dim data = Encoding.Unicode.GetBytes(number.ToString()) result = md5M.ComputeHash(data) hexString = ConvertToHexString(result) ' Console.WriteLine(ConvertToHexString(result)) End Sub) Debug.WriteLine("MD5: " + sw.Elapsed.ToString()) End Sub
The GenerateMD5InputData function returns a sequence of Integer numbers from 1 to NUM_AES_KEYS (inclusive). Instead of using the loop to control the numbers for the iteration, the code in the ParallelForEachGenerateMD5Hashes subroutine saves this sequence in the inputData local variable.
The following line calls Parallel.ForEach with the source (inputData) and a multiline lambda delegate subroutine, receiving the number for each iteration:
Parallel.ForEach(inputData, Sub(number As Integer)
The line that prepares the input data for the hash computing method also changes to use the value found in number:
Dim data = Encoding.Unicode.GetBytes(number.ToString())
If you want to interrupt a sequential loop, you can use Exit For or Exit For Each. When working with parallel loops, it requires more complex code, because exiting the delegate body sub or function doesn't have any effect on the parallel loop's execution, as it is the one that's being called on each new iteration. In addition, because it is a delegate, it is disconnected from the traditional loop structure.
The following code (code file: Listing09.sln) shows a new version of the ParallelForEachGenerateMD5Hashes subroutine, called ParallelForEachGenerateMD5HashesBreak. Now, the loopResult local variable saves the result of calling the Parallel.ForEach class function. Moreover, the delegate body subroutine receives a second parameter—a ParallelLoopState instance:
Dim loopResult = Parallel.ForEach( inputData, Sub(number As Integer, loopState As ParallelLoopState) Private Sub DisplayParallelLoopResult( ByVal loopResult As ParallelLoopResult) Dim text As String If loopResult.IsCompleted Then text = "The loop ran to completion." Else If loopResult.LowestBreakIteration.HasValue = False Then text = "The loop ended prematurely with a Stop statement." Else text = "The loop ended by calling the Break statement." End If End If Console.WriteLine(text) End Sub Sub ParallelForEachGenerateMD5HashesBreak() Dim sw = Stopwatch.StartNew() Dim inputData = GenerateMD5InputData() Dim loopResult = Parallel.ForEach( inputData, Sub(number As Integer, loopState As ParallelLoopState) 'If loopState.ShouldExitCurrentIteration Then ' Exit Sub 'End If Dim md5M As MD5 = MD5.Create() Dim data = Encoding.Unicode.GetBytes(number.ToString()) Dim result = md5M.ComputeHash(data) Dim hexString = ConvertToHexString(result) If (sw.Elapsed.Seconds > 3) Then loopState.Break() Exit Sub End If ' Console.WriteLine(ConvertToHexString(result)) End Sub) DisplayParallelLoopResult(loopResult) Console.WriteLine("MD5: " + sw.Elapsed.ToString()) End Sub Private Function GenerateMD5InputData() As IEnumerable(Of Integer) Return Enumerable.Range(1, NUM_AES_KEYS) End Function
The instance of ParallelLoopState (loopState) offers two methods to cease the execution of a Parallel.For or Parallel.ForEach:
The code shown previously from file Listing09.sln calls the Break method if the elapsed time is more than 3 seconds:
If (sw.Elapsed.Seconds > 3) Then loopState.Break() Exit Sub End If
It is very important to note that the code in the multiline lambda is accessing the sw variable that is defined in ParallelForEachGenerateMD5HashesBreak. It reads the value of the Seconds read-only property.
It is also possible to check the value of the ShouldExitCurrentIteration read-only property in order to make decisions when the current or other concurrent iterations make requests to stop the parallel loop execution. The code line in code file Listing 09.sln shows a few commented lines that check whether ShouldExitConcurrentIteration is True:
If loopState.ShouldExitCurrentIteration Then Exit Sub End If
If the property is true, then it exits the subroutine, avoiding the execution of unnecessary iterations. The lines are commented because in this case an additional iteration isn't a problem; therefore, it isn't necessary to add this additional instruction to each iteration.
Once the Parallel.ForEach finishes its execution, loopResult has information about the results, in a ParallelLoopResult structure.
The DisplayParallelLoopResult subroutine shown in the code file Listing09.sln receives a ParallelLoopResult structure, evaluates its read-only properties, and outputs the results of executing the Parallel.ForEach loop to the console. explains the three possible results of in this example.
Condition | Description |
IsCompleted = True | The loop ran to completion. |
IsCompleted = False And LowesBreakIteration.HasValue = False | The loop ended prematurely with a Stop statement. |
IsCompleted = False And LowesBreakIteration.HasValue = True | The loop ended by calling the Break statement. The LowestBreakIteration property holds the value of the lowest iteration that called the Break statement. |
As many iterations run in parallel, many exceptions can occur in parallel. The classic exception management techniques used in sequential code aren't useful with parallel loops.
When the code inside the delegate that is being called in each parallelized iteration throws an exception that isn't captured inside the delegate, it becomes part of a set of exceptions, handled by the new System.AggregateException class.
You have already learned how to handle exceptions in your sequential code in Chapter 6: “Exception Handling and Debugging.” You can apply almost the same techniques. The only difference is when an exception is thrown inside the loop body, which is a delegate. The following code (code file: Listing10.sln) shows a new version of the ParallelForEachGenerateMD5Hashes subroutine, called ParallelForEachGenerateMD5HashesException. Now, the body throws a TimeOutException if the elapsed time is more than three seconds:
If (sw.Elapsed.Seconds > 3) Then Throw New TimeoutException( "Parallel.ForEach is taking more than 3 seconds to complete.") End If Sub ParallelForEachGenerateMD5HashesExceptions() Dim sw = Stopwatch.StartNew() Dim inputData = GenerateMD5InputData() Dim loopResult As ParallelLoopResult Try loopResult = Parallel.ForEach(inputData, Sub(number As Integer, loopState As ParallelLoopState) 'If loopState.ShouldExitCurrentIteration Then ' Exit Sub 'End If Dim md5M As MD5 = MD5.Create() Dim data = Encoding.Unicode.GetBytes(number.ToString()) Dim result = md5M.ComputeHash(data) Dim hexString = ConvertToHexString(result) If (sw.Elapsed.Seconds > 3) Then Throw New TimeoutException( "Parallel.ForEach is taking more than 3 seconds to complete.") End If ' Console.WriteLine(ConvertToHexString(result)) End Sub) Catch ex As AggregateException For Each innerEx As Exception In ex.InnerExceptions Debug.WriteLine(innerEx.ToString()) ' Do something considering the innerEx Exception Next End Try DisplayParallelLoopResult(loopResult) Debug.WriteLine("MD5: " + sw.Elapsed.ToString()) End Sub
A Try…Catch…End Try block encloses the call to Parallel.ForEach. Nevertheless, the line that catches the exceptions is
Catch ex As AggregateException
instead of the classic
Catch ex As Exception
An AggregateException contains one or more exceptions that occurred during the execution of parallel and concurrent code. However, this class isn't specifically for parallel computing; it can be used to represent one or more errors that occur during application execution. Therefore, once it is captured, it is possible to iterate through each individual exception contained in the InnerExceptions read-only collection of Exception. In this case, the Parallel.ForEach without the custom partitioner will display the contents of many exceptions. The loop result will look like it was stopped using the Stop keyword. However, as it is possible to catch the AggregateException, you can make decisions based on the problems that made it impossible to complete all the iterations. In this case, a sequential For Each loop retrieves all the information about each Exception in InnerExceptions. The following code (code file: Listing11.sln) shows the information about the first two exceptions converted to a string and sent to the Debug output:
Catch ex As AggregateException For Each innerEx As Exception In ex.InnerExceptions Debug.WriteLine(innerEx.ToString()) ' Do something considering the innerEx Exception Next End Try
The output looks like this:
System.TimeoutException: Parallel.ForEach is taking more than 3 seconds to complete. at ConsoleApplication3.Module1. _Closure$____2._Lambda$____9(Int32 number, ParallelLoopState loopState) in C:\Users\Public\Documents\ConsoleApplication3\ ConsoleApplication3\Module1.vb:line 255 at System.Threading.Tasks.Parallel.<>c____DisplayClass32`2. <PartitionerForEachWorker>b____30() at System.Threading.Tasks.Task.InnerInvoke() at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask) at System.Threading.Tasks.Task.<>c____DisplayClass7. <ExecuteSelfReplicating>b____6(Object ) System.TimeoutException: Parallel.ForEach is taking more than 3 seconds to complete. at ConsoleApplication3.Module1._Closure$____2. _Lambda$____9(Int32 number, ParallelLoopState loopState) in C:\Users\Public\Documents\ConsoleApplication3\ ConsoleApplication3\Module1.vb:line 255 at System.Threading.Tasks.Parallel.<>c____DisplayClass32`2. <PartitionerForEachWorker>b____30() at System.Threading.Tasks.Task.InnerInvoke() at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask) at System.Threading.Tasks.Task.<>c____DisplayClass7. <ExecuteSelfReplicating>b____6(Object )
When you work with tasks, they run their code using underlying threads (software threads, scheduled on certain hardware threads, or logical cores). However, there isn't a one-to-one relationship between tasks and threads. This means you aren't creating a new thread each time you create a new task. The CLR creates the necessary threads to support the tasks' execution needs. Of course, this is a simplified view of what goes on when creating tasks.
Synchronizing code running in multiple threads is indeed complex. Thus, a task-based alternative offers an excellent opportunity to leave some synchronization problems behind, especially those regarding work scheduling mechanisms. The CLR uses work-stealing queues to reduce the locks and to schedule small work chunks without adding a significant overhead. Creating a new thread introduces a big overhead, but creating a new task “steals” work from an existing thread whenever possible. Therefore, tasks offer a new lightweight mechanism for parts of code capable of taking advantage of multiple cores.
The default task scheduler relies on an underlying thread pool engine. Thus, when you create a new task, it will use the steal-working queues to find the most appropriate thread to enqueue it. It steals work from an existing thread or creates a new one when necessary. The code included in tasks will run in one thread, but this happens under the hood, and the overhead is smaller than manually creating a new thread.
So far, TPL has been creating instances of System.Threading.Tasks.Task under the hood in order to support the parallel execution of the iterations. In addition, calling Parallel.Invoke also creates as many instances of Task as delegates are called.
A Task represents an asynchronous operation. It offers many methods and properties that enable you to control its execution and get information about its status. The creation of a Task is independent of its execution. This means that you have complete control over the execution of the associated operation.
explains the three possible situations considered in this example.
Property | Description |
AsyncState | A state Object supplied when you created the Task instance. |
CreationOptions | The TaskCreationOptions enum value used to provide hints to the task scheduler in order to help it make the best scheduling decisions. |
CurrentId | The unique ID for the Task being executed. It is not equivalent to a thread ID in unmanaged code. |
Exception | The AggregateException that caused the Task to end prematurely. It is a null value if the Task hasn't thrown exceptions at all or finished without throwing exceptions. |
Factory | Provides access to the factory methods that allow the creation of Task instances with and without results |
Id | The unique ID for the Task instance |
IsCanceled | A Boolean value indicating whether the Task instance was canceled |
IsCompleted | A Boolean value indicating whether the Task has completed its execution |
IsFaulted | A Boolean value indicating whether the Task has aborted its execution due to an unhandled exception |
Status | The TaskStatus value indicating the current stage in the life cycle of a Task instance |
It is very important to understand that each Task instance has a life cycle. However, it represents concurrent code potentially running in parallel according to the possibilities offered by the underlying hardware and the availability of resources at run time. Therefore, any information about the Task instance could change as soon as you retrieve it, because its states are changing concurrently.
A Task instance completes its life cycle just once. After it reaches one of its three possible final states, it doesn't go back to any previous state, as shown in the state diagram in .
A Task instance has three possible initial states, depending on how it was created, as described in .
Value | Description |
TaskStatus.Created | A Task instance created using the Task constructor has this initial state. It will change once there is a call to either Start or RunSynchronously, or if the task is canceled. |
TaskStatus.WaitingForActivation | This is the initial state for tasks created through methods that allow the definition of continuations—that is, tasks that aren't scheduled until other dependent tasks finish their execution. |
TaskStatus.WaitingToRun | This is the initial state for a task created through TaskFactory.StartNew. It is waiting for the specified scheduler to pick it up and run it. |
Next, the task status can transition to the TaskStatus.Running state, and finally move to a final state. If it has attached children, it isn't considered complete and will transition to the TaskStatus.WaitingForChildrenToComplete state. Once its children tasks complete, the task moves to one of the three possible final states shown in .
Value | Description |
TaskStatus.Canceled | A cancellation request arrived before the task started its execution or during it. The IsCanceled property will be True. |
TaskStatus.Faulted | An unhandled exception in its body or the bodies of its children made the task end. The IsFaulted property will be True and the Exception property will be non-null and will hold the AggregateException that caused the task or its children to end prematurely. |
TaskStatus.RanToCompletion | The task completed its execution. It ran to the end of its body without being canceled or throwing an unhandled exception. The IsCompleted property will be True. In addition, IsCanceled and IsFaulted will be both False. |
In a previous example, you used Parallel.Invoke to launch two subroutines in parallel:
Parallel.Invoke(Sub() GenerateAESKeys(), Sub() GenerateMD5Hashes())
It is possible to do the same job using two instances of Task, as shown in the following code (code file: Listing13.sln). Working with instances of Tasks offers more flexibility to schedule and start independent and chained tasks that can take advantage of multiple cores.
’ Create the tasks Dim t1 = New Task(Sub() GenerateAESKeys()) Dim t2 = New Task(Sub() GenerateMD5Hashes()) ’ Start the tasks t1.Start() t2.Start() ’ Wait for all the tasks to finish Task.WaitAll(t1, t2)
The first two lines create two instances of Task with a lambda expression to create a delegate for GenerateAESKeys and GenerateMD5Hashes. t1 is associated with the first subroutine, and t2 with the second. It is also possible to use multiline lambda expression syntax to define the action that the Task constructor receives as a parameter. At this point, the Status for both Task instances is TaskStatus.Created. The subroutines aren't running yet, but the code continues with the next line.
Then, the following line starts the asynchronous execution of t1:
t1.Start()
The Start method initiates the execution of the delegate in an independent way, and the program flow continues with the instruction after this method, even though the delegate has not finished its execution. The code in the delegate associated with the task runs concurrently and potentially in parallel with the main program flow, the main thread. This means that at this point, there is a main thread and another thread or threads supporting the execution of this new task.
The execution of the main program flow, the main thread, is synchronous. This means that it will continue with the next instruction, the line that starts the asynchronous execution of t2:
t2.Start()
Now the Start method initiates the execution of the delegate in another independent way and the program flow continues with the instruction after this method, even though this other delegate has not finished its execution. The code in the delegate associated with the task runs concurrently and potentially in parallel with the main thread and the code inside GenerateAESKeys that is already running. This means that at this point, there is a main thread and other threads supporting the execution of the two tasks.
The sequence diagram in shows the parallel and asynchronous execution flow for the main thread and the two tasks.
The Visual Basic 2012 IDE improves the two debugging windows introduced in Visual Basic 2010: Parallel Tasks and Parallel Stacks. These windows offer information about the tasks that are running, including their status and their relationship with the underlying threads. It is very easy to understand the relationship between the tasks and the underlying threads in .NET Framework 4.5 when debugging simple parallelized code and running it step-by-step using the debugging windows, which enable you to monitor what is going on under the hood. By running the code step-by-step, you can see the differences between synchronous and asynchronous execution.
For example, if you insert a breakpoint on the line Task.WaitAll(t1, t2) and your microprocessor has at least two cores, you will be able to see two tasks running in parallel while debugging. To do so, select Debug ⇒ Windows ⇒ Parallel Tasks (Ctrl + Shift + D, K) while debugging the application. The IDE will display the Parallel Tasks dialogue shown in , which includes a list of all the tasks and their status (Active).
There are two tasks:
Each of the two tasks is assigned to a different thread. The status for both tasks is Running, and they are identified by an auto-generated lambda name and number, <lambda10>() and <lambda11>(). This happens because the code uses lambda expressions to generate the delegates associated with each task.
If you double-click on a task name, the IDE will display the next statement that is going to run for the selected task. Remember that the threads assigned to these tasks and the main thread are running concurrently and potentially in different logical cores, according to the available hardware resources and the decisions taken by the schedulers.
You can check what is going on with each different concurrent task. You have similar options to those offered by previous Visual Basic versions with threads, but the information is better because you can check whether a task is scheduled or waiting-deadlocked. You can also order and group the information shown in the windows, as you can with any other Visual Basic IDE feature.
The Parallel Tasks grid includes a column named Thread Assignment. This number is the ID shown in the Threads window. Thus, you know which managed thread is supporting the execution of a certain task while debugging. You can also check the next statement and additional detailed information for each different thread. To do so, select Debug ⇒ Windows ⇒ Threads (Ctrl + Alt + H) while debugging the application. The IDE will display the Threads dialog shown in , which includes a list of all the threads, their category, and their locations.
The two threads on the right side of the diagram are running the code scheduled by the two tasks. Each thread shows its call stack. The thread that supports Module1.<lambda10> is running the GenerateAESKeys subroutine — specifically, code inside the call to the ConvertToHexString subroutine. The thread that supports Module1.<lambda11> is running the GenerateMD5Hashes subroutine. This diagram indicates what each thread is doing with a great level of detail.
You can change the value for the combo box in the upper-left corner from Threads to Tasks, and the IDE will display a diagram with all the tasks, including their status, relationships, and the call stack, as shown in .
At some point, you need to wait for certain tasks, started with an asynchronous execution, to finish. The following line calls the Task.WaitAll method, which will wait for the Task instances received as a ParamArray, separated by commas. This method has a synchronous execution, which means that the main thread won't continue with the next statement until the Task instances received as parameters finish their execution.
Task.WaitAll(t1, t2)
Here, t1 and t2 have to finish their execution. The current thread—in this case, the main thread—will wait until both tasks finish their execution. However, it is very important that this time waiting for the tasks to finish is not a loop continuously checking a status and consuming a lot of CPU cycles. The WaitAll method uses a lightweight mechanism to reduce the need for CPU cycles as much as possible. This way, once these tasks finish their execution, the next statement will run.
Because the WaitAll method uses a synchronous execution, if the tasks take one minute to run, then the thread where this method was called (in this case, the main thread) will be waiting for this amount of time. Therefore, sometimes you want to limit the number of milliseconds to wait for the tasks to finish. You can use another definition for the Task.WaitAll method that accepts an array of Task instances and the number of milliseconds to wait. The method returns a Boolean value indicating whether the tasks where able to finish within the specified timeout. The following code waits for t1 and t2 to finish their execution with a three-second timeout (code file: Snippet03.sln):
If Task.WaitAll(New Task() {t1, t2}, 3000) = False Then Console.WriteLine( "GenerateAESKeys and GenerateMD5Hashes are taking more than 3 seconds to complete.") Console.WriteLine(t1.Status.ToString()) Console.WriteLine(t2.Status.ToString()) End If
If t1 and t2 don't finish in three seconds, the code displays a message and the status for both tasks. If no exceptions occurred in the code for these tasks, they could be still running. The Task.WaitAll method with a specific timeout doesn't cancel the tasks if they take more time to run; it just returns from its synchronous execution with the Boolean result.
It is also possible to call the Wait method for a Task instance. In this case, the current thread will wait until that task finishes its execution. Of course, there is no need to send the task instance as a parameter because the Wait method is an instance method. The Task.Wait method also supports a timeout in one of its definitions. The following code waits for t1 to finish, and if it doesn't complete its work in three seconds, it displays a message and its status (code file: Snippet04.sln):
If t1.Wait (3000) = False Then Console.WriteLine("GenerateAESKeys is taking more than 3 seconds to complete.") Console.WriteLine(t1.Status.ToString()) End If
You can interrupt the execution of Task instances through the use of cancellation tokens. To do so, it is necessary to add some code in the delegate, in order to create an cancelable operation that is capable of terminating in a timely manner.
The following code (code file: Listing14.sln) shows two new versions of the AES keys and MD5 hash generators. The changes made in order to support cancellation appear in bold. The new GenerateAESKeysCancel, replacing the old GenerateAESKeys, receives a System.Threading.CancellationToken instance and throws an OperationCanceledException calling the ThrowIfCancellationRequested method. This way, the Task instance transitions to the TaskStatus.Canceled state and the IsCanceled property will be True.
Private Sub GenerateAESKeysCancel(ByVal ct As System.Threading.CancellationToken) ct.ThrowIfCancellationRequested() Dim sw = Stopwatch.StartNew() Dim aesM As New AesManaged() For i As Integer = 1 To NUM_AES_KEYS aesM.GenerateKey() Dim result = aesM.Key Dim hexString = ConvertToHexString(result) ' Console.WriteLine("AES: " + ConvertToHexString(result)) If ct.IsCancellationRequested Then ct.ThrowIfCancellationRequested() End If Next Console.WriteLine("AES: " + sw.Elapsed.ToString()) End Sub Private Sub GenerateMD5HashesCancel(ByVal ct As System.Threading.CancellationToken) ct.ThrowIfCancellationRequested() Dim sw = Stopwatch.StartNew() Dim md5M As MD5 = MD5.Create() For i As Integer = 1 To NUM_MD5_HASHES Dim data = Encoding.Unicode.GetBytes(i.ToString()) Dim result = md5M.ComputeHash(data) Dim hexString = ConvertToHexString(result) ' Console.WriteLine(ConvertToHexString(result)) If ct.IsCancellationRequested Then ct.ThrowIfCancellationRequested() End If Next Debug.WriteLine("MD5: " + sw.Elapsed.ToString()) End Sub Sub Main() Dim cts As New System.Threading.CancellationTokenSource() Dim ct As System.Threading.CancellationToken = cts.Token Dim t1 = Task.Factory.StartNew(Sub() GenerateAESKeysCancel(ct), ct) Dim t2 = Task.Factory.StartNew(Sub() GenerateMD5HashesCancel(ct), ct) ' Sleep the main thread for 1 second Threading.Thread.Sleep(1000) cts.Cancel() Try If Task.WaitAll(New Task() {t1, t2}, 1000) = False Then Console.WriteLine( "GenerateAESKeys and GenerateMD5Hashes are taking more than 1 second to complete.") Console.WriteLine(t1.Status.ToString()) Console.WriteLine(t2.Status.ToString()) End If Catch ex As AggregateException For Each innerEx As Exception In ex.InnerExceptions Debug.WriteLine(innerEx.ToString()) ' Do something else considering the innerEx Exception Next End Try If t1.IsCanceled Then Console.WriteLine("The task running GenerateAESKeysCancel was canceled.") End If If t2.IsCanceled Then Console.WriteLine( "The task running GenerateMD5HashesCancel was canceled.") End If ' Display the results and wait for the user to press a key Console.ReadLine() End Sub
The first line of GenerateAESKeysCancel will throw the aforementioned exception if its cancellation was already requested at that time. This way, it won't start the loop if unnecessary at that point.
ct.ThrowIfCancellationRequested()
In addition, after each iteration of the loop, new code checks the token's IsCancellationRequested. If it is True, it calls the ThrowIfCancellationRequested method. Before calling this method, when IsCancellationRequested is True, it is possible to add cleanup code when necessary:
If ct.IsCancellationRequested Then ' It is important to add cleanup code here when necessary ct.ThrowIfCancellationRequested() End If
Doing this adds a small amount of overhead to each iteration of the loop, but it is capable of observing an OperationCanceledException and compares its token to the Task instance's associated one. If they are the same and its IsCancelled property is True, the Task instance understands that there is a request for cancellation and makes the transition to the Canceled state, interrupting its execution. When there is code waiting for the canceled Task instance, this also generates an automatic TaskCanceledException, which is wrapped in an AggregateException.
In this case, the main subroutine creates a CancellationTokenSource, cts, and a Cancellation Token, ct:
Dim cts As New System.Threading.CancellationTokenSource() Dim ct As System.Threading.CancellationToken = cts.Token
CancellationTokenSource is capable of initiating cancellation requests, and CancellationToken communicates it to asynchronous operations.
It is necessary to send a CancellationToken as a parameter to each task delegate; therefore, the code uses one of the definitions of the TaskFactory.StartNew method. The following lines create and start two Task instances with associated actions and the same CancellationToken instance (ct) as parameters:
Dim t1 = Task.Factory.StartNew(Sub() GenerateAESKeysCancel(ct), ct) Dim t2 = Task.Factory.StartNew(Sub() GenerateMD5HashesCancel(ct), ct)
The preceding lines use the Task class Factory property to retrieve a TaskFactory instance that can be used to create tasks with more options than those offered by direct instantiation of the Task class. In this case, it uses the StartNew method, which is functionally equivalent to creating a Task using one of its constructors and then calling Start to schedule it for execution.
Then, the code calls the Sleep method to make the main thread sleep for one second. This method suspends the current thread for the indicated time—in this case, specified as an Integer in milliseconds:
Threading.Thread.Sleep(1000)
One second later, the main thread communicates a request for cancellation for both tasks through the CancellationTokenSource instance's Cancel method:
cts.Cancel()
The cancellation token is evaluated in the two delegates launched by the Task instances, as previously explained.
Adding a few lines, it is indeed easy to cancel asynchronous actions. However, it is very important to add the necessary cleanup code.
A Try…Catch…End Try block encloses the call to Task.WaitAll. Because there was a request for cancellation for both tasks, there will be two benign exceptions of type OperationCanceledException.
The IsCanceled property for both tasks is going to be True. Checking this property, you can add code whenever a task was canceled.
As many tasks run in parallel, many exceptions can occur in parallel. Task instances also work with a set of exceptions, handled by the previously explained System.AggregateException class.
The following code (code file: Listing15.sln) shows the highlighted lines that add an unhandled exception in the GenerateAESKeysCancel subroutine.
Comment the code that requested cancellation for both tasks:
’ cts.Cancel() Private Sub GenerateAESKeysCancel(ByVal ct As System.Threading.CancellationToken) ct.ThrowIfCancellationRequested() Dim sw = Stopwatch.StartNew() Dim aesM As New AesManaged() For i As Integer = 1 To NUM_AES_KEYS aesM.GenerateKey() Dim result = aesM.Key Dim hexString = ConvertToHexString(result) ' Console.WriteLine("AES: " + ConvertToHexString(result)) If (sw.Elapsed.Seconds > 0.5) Then Throw New TimeoutException( "GenerateAESKeysCancel is taking more than 0.5 seconds to complete.") End If If ct.IsCancellationRequested Then ct.ThrowIfCancellationRequested() End If Next Console.WriteLine("AES: " + sw.Elapsed.ToString()) End Sub
Add the following lines to the Main subroutine:
If t1.IsFaulted Then For Each innerEx As Exception In t1.Exception.InnerExceptions Debug.WriteLine(innerEx.ToString()) ' Do something else considering the innerEx Exception Next End If
Because there is an unhandled exception in t1, its IsFaulted property is True. Therefore, t1.Exception, an AggregateException, contains one or more exceptions that occurred during the execution of its associated delegate. After checking the IsFaulted property, it is possible to iterate through each individual exception contained in the InnerExceptions read-only collection of Exception. You can make decisions according to the problems that made it impossible to complete the task. The following lines show the information about the unhandled exception converted to a string and sent to the Debug output.
System.TimeoutException: GenerateAESKeysCancel is taking more than 0.5 seconds to complete. at ConsoleApplication3.Module1.GenerateAESKeysCancel(CancellationToken ct) in C:\Wrox\Professional_VB_2012\ConsoleApplication3\ ConsoleApplication3\Module1.vb:line 427 at ConsoleApplication3.Module1._Closure$____3._Lambda$____11() in C:\Wrox\Professional_VB_2012\ConsoleApplication3\ ConsoleApplication3\Module1.vb:line 337 at System.Threading.Tasks.Task.InnerInvoke() at System.Threading.Tasks.Task.Execute()
So far, task instances did not return values; they were delegates running subroutines. However, it is also possible to return values from tasks, invoking functions and using Task(Of TResult) instances, where TResult has to be replaced by the returned type.
The following code (code file: Listing17.sln) shows the code for a new function that generates the well-known AES keys and then returns a list of the ones that begin with the character prefix received as one of the parameters (prefix). GenerateAESKeysWithCharPrefix returns a List of String.
The Main subroutine uses the definition of the TaskFactory.StartNew method, but this time it calls it from a Task(Of TResult) instance and not a Task instance. Specifically, it creates a Task(Of List(Of String)) instance, sending it a CancellationToken as a parameter to the task delegate:
Dim t1 = Task(Of List(Of String)).Factory.StartNew( Function() GenerateAESKeysWithCharPrefix(ct, "A"), ct)
The delegate is a function that returns a List(Of String), which is going to be available in the Task(Of Result) instance (t1) through its Result property after the associated delegate completes its execution and the function returns a value.
The main thread waits for t1 to finish and then checks whether it completed its execution, checking the previously explained Task instance properties. Then, it iterates through each string in the list, returned by the function called in the previous task, and displays the results on the console. It does this job running a new asynchronous task, t2 (code file: Listing17.sln).
Private Function GenerateAESKeysWithCharPrefix( ByVal ct As System.Threading.CancellationToken, ByVal prefix As Char) As List(Of String) ct.ThrowIfCancellationRequested() Dim sw = Stopwatch.StartNew() Dim aesM As New AesManaged() Dim keysList As New List(Of String) For i As Integer = 1 To NUM_AES_KEYS aesM.GenerateKey() Dim result = aesM.Key Dim hexString = ConvertToHexString(result) If Left(hexString, 1) = prefix Then keysList.Add(hexString) End If If ct.IsCancellationRequested Then ' It is important to add cleanup code here ct.ThrowIfCancellationRequested() End If Next Return keysList Console.WriteLine("AES: " + sw.Elapsed.ToString()) End Function Sub Main() Dim sw = Stopwatch.StartNew() Dim cts As New System.Threading.CancellationTokenSource() Dim ct As System.Threading.CancellationToken = cts.Token Dim t1 = Task(Of List(Of String)).Factory.StartNew( Function() GenerateAESKeysWithCharPrefix(ct, "A"), ct) Try t1.Wait() Catch ex As AggregateException For Each innerEx As Exception In ex.InnerExceptions Debug.WriteLine(innerEx.ToString()) ' Do something else considering the innerEx Exception Next End Try If t1.IsCanceled Then Console.WriteLine( "The task running GenerateAESKeysWithCharPrefix was canceled.") Exit Sub End If If t1.IsFaulted Then For Each innerEx As Exception In t1.Exception.InnerExceptions Debug.WriteLine(innerEx.ToString()) ' Do something else considering the innerEx Exception Next Exit Sub End If Dim t2 = Task.Factory.StartNew(Sub() ' Do something with the result ' returned by the task's delegate For i As Integer = 0 To t1.Result.Count - 1 Console.WriteLine(t1.Result(i)) Next End Sub, TaskCreationOptions.LongRunning) ' Wait for the user to press a key while t2 is displaying the results Console.ReadLine() End Sub
The code creates and starts the second task, t2, using the StarNew method and multiline lambda expression syntax. However, in this case, it uses a different definition that receives a TaskCreationOptions parameter that specifies flags to control optional behavior for the creation, scheduling, and execution of tasks.
The TaskCreationOptions enumeration has the four members described in .
Value | Description |
TaskCreationOptions.AttachedToParent | The task is attached to a parent task. You can create tasks inside other tasks. |
TaskCreationOptions.None | The task can use the default behavior. |
TaskCreationOptions.LongRunning | The task will take a long time to run. Therefore, the scheduler can work with it as a coarse-grained operation. You can use this option if the task is likely to take many seconds to run. It is not advisable to use this option when a task takes less than one second to run. |
TaskCreationOptions.PreferFairness | This option tells the scheduler that tasks scheduled sooner should be run sooner and tasks scheduled later should be run later. |
Clearly, the previous case shows an example of chained tasks. Task t1 produces a result, and t2 needs it as an input in order to start processing it. In these cases, instead of adding many lines that check for the successful completion of a precedent task and then schedule a new task, TPL enables you to chain tasks using continuations.
You can call the ContinueWith method for any task instance and create a continuation that executes when this task successfully completes its execution. It has many definitions, the simplest of which defines an action as done when creating Task instances.
The following lines show a simplified version of the code used in the previous example to display the results generated by t1 (code file: Snippet05.sln):
Dim t1 = Task(Of List(Of String)).Factory.StartNew( Function() GenerateAESKeysWithCharPrefix(ct, "A"), ct) Dim t2 = t1.ContinueWith(Sub(t) ' Do something with the result returned ' by the task's delegate For i As Integer = 0 To t.Result.Count - 1 Console.WriteLine(t.Result(i)) Next End Sub)
It is possible to chain many tasks and then wait for the last task to be executed. However, you have to be careful with the continuous changes in the states when checking their values for all these asynchronous operations. In addition, it is very important to consider all the potential exceptions that could be thrown.
Parallel programming applied to certain complex algorithms is not as simple as shown in the previously explained examples. Sometimes, the differences between a reliable and bug-free parallelized version and its sequential counterpart could reveal an initially unexpected complexity. The code can become too complex, even when taking advantage of the new features offered by TPL. In fact, a complex sequential algorithm is probably going to be a more complex parallel algorithm. Therefore, TPL offers many new data structures for parallel programming that simplify many complex synchronization problems:
The aforementioned data structures were designed to avoid locks wherever possible, and use fine-grained locking when they are necessary on their different shared resources. Locks generate many potential bugs and can significantly reduce scalability. However, sometimes they are necessary because writing lock-free code isn't always possible.
These new data structures enable you to forget about complex lock mechanisms in certain situations because they already include all the necessary lightweight synchronization under the hood. Therefore, it is a good idea to use these data structures whenever possible.
Furthermore, .NET Framework 4.5 offers synchronization primitives for managing and controlling the interactions between different tasks and their underlying threads, including the following operations:
The aforementioned synchronization primitives are advanced topics that require an in-depth analysis in order to determine the most convenient primitive to apply in a given situation. Nowadays, it is important to use the right synchronization primitive in order to avoid potential pitfalls, explained in the following list, while still keeping the code scalable.
Many techniques and new debugging tools can simplify the most complex problems, such as the following:
Lists, collections, and arrays are excellent examples of when complex synchronization management is needed to access them concurrently and in parallel. If you have to write a parallel loop that adds elements in an unordered way into a shared collection, you have to add a synchronization mechanism to generate a thread-safe collection. The classic lists, collections, and arrays are not thread-safe because they aren't prepared to receive concurrent instructions to add or remove elements. Therefore, creating a thread-safe collection is indeed a very complex job.
Luckily, TPL offers a new namespace, System.Collections.Concurrent, for dealing with thread-safe issues. As previously explained, this namespace provides access to the custom partitioners for parallelized loops. However, it also offers access to the following collections prepared for concurrency:
It would be difficult to use a classic shared list to add elements from many independent tasks created by the Parallel.ForEach method. You would need to add synchronization code, which would be a great challenge without restricting the overall scalability. However, it is possible to add strings to a queue (enqueue strings) in a shared ConcurrentCollection inside the parallelized code, because it is prepared for adding elements concurrently.
The following code (code file Listing18.sln) uses a shared ConcurrentQueue(Of String), Keys, in order to hold the strings that contain the AES keys that begin with a certain prefix, generated in a parallelized loop with the custom partitioner. All the tasks created automatically by Parallel.ForEach are going to call the Enqueue method to add the elements that comply with the condition.
Keys.Enqueue(hexString)
It is indeed simple to work with a ConcurrentQueue. There is no need to worry about synchronization problems because everything is controlled under the hood.
Private Keys As Concurrent.ConcurrentQueue(Of String) Private Sub ParallelPartitionGenerateAESKeysWCP( ByVal ct As System.Threading.CancellationToken, ByVal prefix As Char) ct.ThrowIfCancellationRequested() Dim sw = Stopwatch.StartNew() Dim parallelOptions As New ParallelOptions() ' Set the CancellationToken for the ParallelOptions instance parallelOptions.CancellationToken = ct Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), parallelOptions, Sub(range) Dim aesM As New AesManaged() 'Debug.WriteLine("Range ({0}, {1}. Time: {2})", ' range.Item1, range.Item2, Now().TimeOfDay) For i As Integer = range.Item1 To range.Item2 - 1 aesM.GenerateKey() Dim result = aesM.Key Dim hexString = ConvertToHexString(result) ' Console.WriteLine("AES: " + ConvertToHexString(result)) If Left(hexString, 1) = prefix Then Keys.Enqueue(hexString) End If parallelOptions.CancellationToken.ThrowIfCancellationRequested() Next End Sub) Console.WriteLine("AES: " + sw.Elapsed.ToString()) End Sub Sub Main() Dim cts As New System.Threading.CancellationTokenSource() Dim ct As System.Threading.CancellationToken = cts.Token Keys = New ConcurrentQueue(Of String) Dim tAsync = New Task(Sub() ParallelPartitionGenerateAESKeysWCP(ct, "A")) tAsync.Start() ' Do something else ' Wait for tAsync to finish tAsync.Wait() Console.ReadLine() End Sub
For example, it is possible to run many LINQ queries to display partial statistics while running the task that is adding elements to the ConcurrentQueue (Keys). The following code (code file: Listing19.sln) shows a new Main subroutine that checks whether the task (tAsync) is running or waiting to run, and while this happens it runs a LINQ query to show the number of keys that contain an F in the shared ConcurrentQueue (Keys).
Sub Main() Dim cts As New System.Threading.CancellationTokenSource() Dim ct As System.Threading.CancellationToken = cts.Token Keys = New ConcurrentQueue(Of String) Dim tAsync = Task.Factory.StartNew( Sub() ParallelPartitionGenerateAESKeysWCP(ct, "A")) Do While (tAsync.Status = TaskStatus.Running) Or (tAsync.Status = TaskStatus.WaitingToRun) ' Display partial results Dim countQuery = Aggregate key In Keys Where key.Contains("F") Into Count() Console.WriteLine("So far, the number of keys that contain an F is: {0}", countQuery) ' Sleep the main thread for 0.5 seconds Threading.Thread.Sleep(500) Loop tAsync.Wait() ' Do something else Console.ReadLine() End Sub
Another useful feature is the capability to remove an element at the beginning of the queue in a safe way using its TryDequeue method:
Dim firstKey As String If Keys.TryDequeue(firstKey) Then ' firstKey has the first key added to the ConcurrentQueue Else ' It wasn't possible to remove an element from the ConcurrentQueue End If
TryDequeue returns a Boolean value indicating whether the operation was successful. It returns the element using an output attribute—in this case, a String received by reference (firstKey).
It is possible to add and remove elements in different tasks.
ConcurrentStack is very similar to the previously explained ConcurrentQueue but it uses different method names to better represent a stack (a LIFO collection). Its most important methods are Push and TryPop.
Push inserts an element at the top of the ConcurrentStack. If Keys were a ConcurrentStack(Of String), the following lines would add hexString at the top of the stack:
If Left(hexString, 1) = prefix Then Keys.Push(hexString) End If
You can remove an element at the top of the stack in a safe way using its TryPop method. However, in this case, the method will return the last element added because it is a stack and not a queue:
Dim firstKey As String If Keys.TryPop(firstKey) Then ' firstKey has the last key added to the ConcurrentStack Else ' It wasn't possible to remove an element from the ConcurrentStack End If
TryPop also returns a Boolean value indicating whether the operation was successful.
You already learned that LINQ is very useful to query and process different data sources. If you are using LINQ to Objects, it is possible to take advantage of parallelism using its parallel implementation, Parallel LINQ (PLINQ).
As you might have expected, LINQ and PLINQ can work with the previously explained concurrent collections. The following code defines a simple but intensive function to count and return the number of letters in a string received as a parameter (code file: Snippet06.sln):
Function CountLetters(ByVal key As String) As Integer Dim letters As Integer = 0 For i As Integer = 0 To key.Length() - 1 If Char.IsLetter(key, i) Then letters += 1 Next Return letters End Function
A simple LINQ expression to return all the AES keys with at least 10 letters containing an A, an F, a 9, and not a B, would look like the following:
Dim keysWith10Letters = From key In Keys Where CountLetters(key) >= 10 And key.Contains("A") And key.Contains("F") And key.Contains("9") And Not key.Contains("B")
In order to transform the aforementioned LINQ expression into a PLINQ expression that can take advantage of parallelism, it is necessary to use the AsParallel method, as shown here:
Dim keysWith10Letters = From key In Keys.AsParallel() Where CountLetters(key) >= 10 And key.Contains("A") And key.Contains("F") And key.Contains("9") And Not key.Contains("B")
This way, the query will try to take advantage of all the available logical cores at run time in order to run faster than its sequential version.
It is possible to add code at the end of the Main subroutine to return some results according to the PLINQ query (code file: Snippet06.sln):
Dim sw = Stopwatch.StartNew() Dim keysWith10Letters = From key In Keys.AsParallel() Where CountLetters(key) >= 10 And key.Contains("A") And key.Contains("F") And key.Contains("9") And Not key.Contains("B") Console.WriteLine("The code generated {0} keys with at least ten letters, A, F and 9 but no B in the hexadecimal code.", keysWith10Letters.Count()) Console.WriteLine("First key {0}: ", keysWith10Letters(0)) Console.WriteLine("Last key {0}: ", keysWith10Letters(keysWith10Letters.Count() - 1)) Debug.WriteLine(sw.Elapsed.ToString()) Console.ReadLine()
This code shows the number of keys that comply with the conditions, the first one and the last one, stored in the results of the PLINQ query that worked against the ConcurrentQueue(Of String).
The System.Linq.ParallelEnumerable class is responsible for exposing most of PLINQ's additional functionality, including its most important one: the AsParallel method. summarizes the PLINQ-specific methods.
Value | Description |
AsOrdered() | PLINQ must preserve the ordering of the source sequence for the rest of the query or until it changes using an Order By clause. |
AsParallel() | The rest of the query should be parallelized, whenever possible. |
AsSequential() | The rest of the query should run sequentially, as traditional LINQ. |
AsUnordered() | PLINQ doesn't have to preserve the ordering of the source sequence. |
ForAll() | An enumeration method that enables the results to be processed in parallel, using multiple tasks. |
WithCancellation | Enables working with a cancellation token to permit cancellation of the query execution as previously explained with tasks. |
WithDegreeOfParallelism | PLINQ will be optimized as if the total number of available cores were equal to the degree of parallelism specified as a parameter for this method. |
WithExecutionMode | This can force parallel execution when the default behavior would be to run it sequentially as traditional LINQ. |
WithMergeOptions | This can provide hints about the way PLINQ should merge the parallel pieces of the result on the thread that is consuming the query. |
In addition, AsParallel offers an Aggregate overload that enables the implementation of parallel reduction algorithms. It enables intermediate aggregation on each parallelized part of the query and a final aggregation function that is capable of providing the logic to combine the results of all the generated partitions.
Sometimes it is useful to run a PLINQ query with many different degrees of parallelism in order to measure its scalability. For example, the following line runs the previously shown PLINQ query to take advantage of no more than three cores:
Dim keysWith10Letters = From key In Keys.AsParallel().WithDegreeOfParallelism(3) Where CountLetters(key) >= 10 And key.Contains("A") And key.Contains("F") And key.Contains("9") And Not key.Contains("B")
Because using AsOrdered and the Order By clause in PLINQ queries can reduce any speed gains, it is very important to compare the speedup achieved against the sequential version before requesting ordered results.
If a PLINQ query doesn't achieve significant performance improvements, you have another interesting option to take advantage of parallelism: running many LINQ queries in independent tasks or using Parallel.Invoke.
The ForAll extension method is very useful to process the results of a query in parallel without having to write a parallel loop. It receives an action as a parameter, offering the same possibilities as the same parameter received by the Task constructors. Therefore, using lambda expressions, you can combine parallelized processing actions from the results of a PLINQ query. The following lines add elements in parallel to a new ConcurrentBag (keysBag), an unordered collection of Integer, counting the letters for each of the keys in the results of the previous PLINQ query (code file: Snippet07.sln):
Dim keysWith10Letters = From key In Keys.AsParallel() Where CountLetters(key) >= 10 And key.Contains("A") And key.Contains("F") And key.Contains("9") And Not key.Contains("B") Dim keysBag As New ConcurrentBag(Of Integer) keysWith10Letters.ForAll(Sub(i) keysBag.Add(CountLetters(i)))
This chapter provided an overview of the task-based programming model introduced with .NET Framework 4 and improved in .NET Framework 4.5. The chapter introduced some of its classes, structures, and enumerations. In order to help you tackle the multicore revolution, it also explained several related concepts used in basic concurrent and parallel programming designs, including the following key points:
The next chapter will dive deep into the different deployment options for .NET applications. You will learn to easily select the most convenient deployment mechanism according to the kind of application you have developed and your target environments.