Книга: Professional Visual Basic 2012 and .NET 4.5
Назад: Chapter 18: Security in the .NET Framework
Дальше: Chapter 20: Deploying XAML Applications via the Window 8 Windows Store

Chapter 19

Parallel Programming Using Tasks and Threads

What's in this chapter?

Understanding the task-based programming model, the Task Parallel Library, and its related hardware technologies

Launching, controlling, managing, and synchronizing parallel tasks

Refactoring loops to run them in parallel using Parallel.For and Parallel.ForEach

Transforming existing sequential code into parallelized code

Measuring the speed gain and the scalability offered by parallelized code

Working with different degrees of parallelism

Understanding the advantages of working with concurrent collections

Implementing a simple parallel producer-consumer pattern

Parallelizing LINQ queries using PLINQ

on the Download Code tab. The code is in the chapter 19 download and individually named according to the code file names listed throughout the chapter.

In the last few years, multicore technology has become the mainstream in CPU designs, and microprocessor manufacturers continue to improve their processing power. However, the shift to multicore is an inflexion point for software design philosophy.

This chapter is about the lightweight concurrency model introduced by Visual Basic 2010 with .NET Framework 4 and extended in Visual Basic 2012 with .NET Framework 4.5. A comprehensive treatment of the challenges offered by the new multicore designs could easily fill 600 pages or more, so this chapter attempts to strike a reasonable balance between detail and succinctness.

shows three of the possible concurrent execution scenarios that could take place according to different hardware configurations or diverse workloads. It is very important to keep in mind that the same code doesn't require a fixed time to run. Therefore, sometimes, the ConvertText method could take more time than the ConvertLines method, even using the same hardware configuration and input data stream.

Three possible parallel executions of four methods launched with Parallel.Invoke

19.1

The top diagram represents an almost ideal situation: the four subroutines running in parallel. It is very important to consider the necessary time to schedule the concurrent tasks, which adds an initial overhead to the overall time.

The middle diagram shows a scenario with just two concurrent lanes and four subroutines to run. On one lane, once ConvertEllipses finishes, ConvertRectangles starts. On the other lane, once ConvertLines finishes, ConvertText starts. Parallel.Invoke takes more time than the previous scenario to run all the subroutines.

The bottom diagram shows another scenario with three concurrent lanes. However, it takes almost the same time as the middle scenario, because in this case the ConvertLines subroutine takes more time to run. Thus, Parallel.Invoke takes almost the same time as the previous scenario to run all the subroutines even using one additional parallel lane.


Note
The code written to run concurrently using Parallel.Invoke doesn't have to rely on a specific execution order. If you have concurrent code that needs a specific execution order, you can work with other mechanisms provided by the TPL. These are covered in detail later in this chapter.

Advantages and Disadvantages

The key advantage of using Parallel.Invoke is its simplicity; you can run many subroutines in parallel without having to worry about tasks or threads. However, it isn't suitable for all the situations in which it is possible to take advantage of parallel execution. Parallel.Invoke has many trade-offs, including the following:


Note
The aforementioned trade-offs apply to the use of Parallel.Invoke as explained in the examples. However, it is possible to combine various different techniques to solve many of these trade-offs. You will learn about many of these mechanisms in this chapter. Parallel.Invoke is ideal to begin working with parallelism and to measure potential speed gains running CPU-intensive methods in parallel. You can improve the code later using the other parallelization methods provided by TPL.

Parallelism and Concurrency

The previously explained example provides a good transition to the differences between parallelism and concurrency, because they aren't the same thing, as shown in .

Possible scenarios for the execution of four methods in 1, 4, and 2 cores

19.2

Concurrency means that different parts of code can start, run, and complete in overlapping time periods. Concurrency can happen even on computers with a single logical core. When many parts of code run concurrently on a computer with a single logical core, time-slicing mechanisms and fast context switches can offer the impression of parallel execution. However, on this hardware, it requires more time to run many parts of code concurrently than to run a single part of code alone, because the concurrent code is competing for hardware resources (refer to ). You can think of concurrency as many cars sharing a single lane. This is why concurrency is also defined as a form of virtual parallelism, but it isn't real parallelism.

Parallelism means that different parts of code can actually run simultaneously, i.e., at the same time, taking advantage of real parallel processing capabilities found in the underlying hardware. Parallelism isn't possible on computers with a single logical core. You need at least two logical cores in order to run parallel code. When many parts of code run in parallel on a computer with multiple logical cores, time-slicing mechanisms and context switches also occur because typically many other parts of code are trying to use processor time. However, when real parallelism occurs, you can achieve speed gains because many parts of code running in parallel can reduce the overall necessary time to complete certain algorithms. The diagram shown in offers two possible parallelism scenarios:

1. An ideal situation: perfect parallelism on four logical cores (four lanes). The instructions for each of the four methods run in a different logical core.
2. A combination of concurrency and parallelism, imperfect parallelism, whereby four methods take advantage of just two logical cores (two lanes). Sometimes the instructions for each of the four methods run in a different logical core, in parallel, and sometimes they have to wait for their time-slice. Therefore, in this case, there is concurrency combined with parallelism. This is the most common situation, because it is indeed very difficult to achieve a perfect parallelism even on real-time operating systems (RTOS).

Note
When parts of code run in parallel with other parts, sometimes new bugs are introduced because of parallelism — that is, they appear only when certain parts of code run exactly at the same time. These bugs can be difficult to locate, making parallel programming even more complex than concurrent programming. Luckily, TPL offers many structures and new debugging features that can help to avoid many parallelism nightmares.

shows the sequential execution flow for this application and the time it takes to run each of the two aforementioned subroutines in a specific computer with a dual-core microprocessor.

Sequential execution flow of two subroutines

19.3

GenerateAESKeys and GenerateMD5Hashes need approximately 14 seconds to run. The first one takes 6 seconds and the latter 8 seconds. Of course, these times will vary considerably according to the underlying hardware configuration.

There is no interaction between these two subroutines. Thus, they are completely independent from each other. As the subroutines run one after the other, in a sequential way, they aren't taking advantage of the parallel processing capabilities offered by the additional core(s). Therefore, these two subroutines represent a clear hotspot where parallelism could help to achieve a significant speedup over sequential execution. For example, it is possible to run both subroutines in parallel using Parallel.Invoke.

Measuring Speedups Achieved by Parallel Execution

Replace the Main subroutine shown in the simple console application with the following new version, launching both GenerateAESKeys and GenerateMD5Hashes in parallel, using Parallel.Invoke (code file: Snippet02.sln):

Sub Main()      Dim sw = Stopwatch.StartNew()      Parallel.Invoke(Sub() GenerateAESKeys(), Sub() GenerateMD5Hashes())      Console.WriteLine(sw.Elapsed.ToString())      ' Display the results and wait for the user to press a key      Console.ReadLine()  End Sub  

shows the parallel execution flow for the new version of this application and the time it takes to run each of the two subroutines in a specific computer with a dual-core microprocessor.

Parallel execution flow for two subroutines with a dual-core microprocessor

19.4

Now, GenerateAESKeys and GenerateMD5Hashes need approximately nine seconds to run because they take advantage of both cores offered by the microprocessor. Thus, it is possible to calculate the speedup achieved using the following formula:

Speedup = (Serial execution time)/(Parallel execution time)

In the preceding example, 14/9 = 1.56 times faster, usually expressed as a 1.56x speedup over the sequential version. GenerateAESKeys takes more time than GenerateMD5Hashes to run, nine seconds versus six seconds. However, Parallel.Invoke doesn't continue with the next line until all the delegates finish their execution. Therefore, during approximately three seconds, the application is not taking advantage of one of the cores, as shown in .

Example of inefficient scheduling when using Parallel.Invoke on a dual-core microprocessor

19.5

In addition, if this application runs on a computer with a quad-core microprocessor, its speedup over the sequential version would be nearly the same, as it won't scale to take advantage of the two additional cores found in the underlying hardware.

In this section, you saw how it is possible to detect hotspots by adding some code to measure the elapsed time to run certain methods. By changing just a few lines of code, a noticeable improvement in speed was achieved. Now it is time to learn other TPL structures that can help to achieve better results and offer improved scalability when the number of available cores increases.


Note
There is no need to initialize TPL in order to begin working with its classes and methods. TPL does a lot of work under the hood and does its best to optimize its scheduling mechanisms to take advantage of the underlying hardware at runtime. However, choosing the right structure to parallelize a hotspot is a very important task.

Understanding Parallel

Now, uncomment the lines that send output to the console in both GenerateAESKeys and GenerateMD5Hashes:

Console.WriteLine(ConvertToHexString(result))  

Writing to the console will generate a bottleneck for the parallel execution. However, this time, there is no need to measure accurate times. Instead, you can view the output to determine that both methods are running in parallel. The following lines show a sample console output generated by this application. The highlighted lines, the shorter hexadecimal strings, correspond to the MD5 hashes. The others represent AES keys. Each AES key takes less time to generate than each MD5 hash. Remember that the code creates 800,000 AES keys (NUM_AES_KEYS) and 900,000 MD5 hashes (NUM_MD5_HASHES). Depending on your environment, the code might take a lot of time to complete the execution.

0364DBC9A8FA3EAC793FC53AAE6D0193484087634C3033C470D96C72F89D7254  E410BCB82B36729CB7CCCCDFE30746F2DF141CC8275790360E2ED731F8C7113D  66CF85EA8FC77746A7C4A116F68D802D7167AE9E7C5FB0B6B85D44B8929386DE  0421897DCF492380BADF872205AE32D94632C60022A4E965652524D7023C59AD  C3BEF1DFFF5A9CAB11BFF8EA3F7DEFC97D91562A358DB56477AD445ACB4F1DE3  AF521D65489CA5C69517E32E652D464676E5F2487E438124DBF9ACF4157301AA  A641EB67C88A29985CFB0B2097B12CFB9296B4659E0949F20271984A3868E0B3  D7A05587DFDFD0C49BEF613F2EB78A43  90BF115C60B2DECA60C237F3D06E42EE  B3519CBA0137FD814C09371836F90322  1415C19F7F93306D35186721AF6B8DDE56427BB9AF29D22E37B34CB49E96BB49  208B73D3E6468F48B950E5F5006DDF30FE7A1B3BCC46489F7722BD98D54079D7  ACD0312DFF1BF29ECA2721DAFA9B20AB5FBDBD20E76C150C5CCE4026990C9D26  EB68C902145439F2A66514B9D89E9A958F18EE15D491014D3DCB312781F277D1  9DB8ABF087C78091F1E77AC769FF175A  F3EFB2804A969D890AFABCE17E84B26E  B342A8A253003754B752B85C67DA1560F30CD36A1AA759A0010E1F8E5045CBB5  9681656DC08F29AB1911A1CCCFBE6B468D1DF7B9D8722324E5E2BB4A314EC649  7DE56E111213655F54D6F8656238CA5E  196D194BA2B786EADD1B6852645C67C5  BA7AC6B878064E98D98336CA5DE45DEC  875DAB451CCE3B5FBD8E5091BAD1A8ED7DB2FF8C9E3EEA834C6DEA7C2467F27E  C1AA2CB88AB669317CB90CD842BF01DB26C6A655D10660AF01C37ECC7AEDA267  66E1F4F56E04FC9BFF225F68008A129D93F9B277ADAB43FF764FB87FFD098B78  

Now, comment the lines that send output to the console in both GenerateAESKeys and GenerateMD5Hashes again.

represents one of the possible execution flows, taking advantage of the four cores. Each box shown inside a method represents a chunk, automatically created by Parallel.For at run time.

Representation of the execution flows that take advantage of four cores with Parallel.For

19.6

Parallel.ForEach

Sometimes, refactoring an existing For loop, as previously explained, can be a very complex task, and the changes to the code could generate too much overhead for each iteration, reducing the overall performance. Another useful alternative is to partition all the data to be processed into parts that can be run as smaller loops in parallel, defining a custom partitioner, a tailored mechanism to split the input data into specific pieces that overrides the default partitioning mechanism. It is possible to use a Parallel.ForEach loop with a custom partitioner in order to create new versions of the sequential loops with a simpler refactoring process.

The code in file Listing05.sln shows the new code with the refactored loops using the imperative syntax to implement data parallelism offered by Parallel.ForEach, combined with a sequential For loop and a custom partitioner created with System.Collections.Concurrent.Partitioner. The new methods, ParallelPartitionGenerateAESKeys and ParallelPartitionGenerateMD5Hashes, also try to take advantage of all the cores available, relying on the work done under the hood by Parallel.ForEach and the range partitioning performed to distribute smaller sequential loops inside as many parallel loops as available cores. The code also optimizes its behavior according to the existing hardware at run time.

The code uses another important namespace for TPL, the new System.Collections.Concurrent namespace. This namespace offers access to useful collections prepared for concurrency and custom partitioners introduced for the first time in .NET Framework 4. Therefore, it is a good idea to import this namespace by using Imports System.Collections.Concurrent to work with the new examples (code file: Listing05.sln):

Sub ParallelPartitionGenerateAESKeys()      Dim sw = Stopwatch.StartNew()      Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1),               Sub(range)                   Dim aesM As New AesManaged()                   Debug.WriteLine("Range ({0}, {1}. Time: {2})",                   range.Item1, range.Item2, Now().TimeOfDay)                   For i As Integer = range.Item1 To range.Item2 - 1                       aesM.GenerateKey()                       Dim result = aesM.Key                       Dim hexString = ConvertToHexString(result)                       ' Console.WriteLine("AES: " +                       '     ConvertToHexString(result))                 Next               End Sub)      Console.WriteLine("AES: " + sw.Elapsed.ToString())  End Sub    Sub ParallelPartitionGenerateMD5Hashes()      Dim sw = Stopwatch.StartNew()      Parallel.ForEach(Partitioner.Create(1, NUM_MD5_HASHES + 1),               Sub(range)                   Dim md5M As MD5 = MD5.Create()                   For i As Integer = range.Item1 To range.Item2 - 1                       Dim data = Encoding.Unicode.GetBytes(i.ToString())                       Dim result = md5M.ComputeHash(data)                       Dim hexString = ConvertToHexString(result)                       ' Console.WriteLine(ConvertToHexString(result))                   Next               End Sub)      Console.WriteLine("MD5: " + sw.Elapsed.ToString())  End Sub  

The class function Parallel.ForEach offers 20 overrides. The definition used in this code file has the following parameters:

In addition, Parallel.ForEach can return a ParallelLoopResult value. The information offered in this structure is covered in detail later in this chapter.

Working with Partitions in a Parallel Loop

The code in file Listing03.sln showed the original GenerateAESKey subroutine with the sequential For loop. The highlighted lines of code shown in file Listing05.sln represent the same sequential For loop. The only line that changes is the For definition, which takes into account the lower bound and the upper bound of the partition assigned by range.Item1 and range.Item2:

For i As Integer = range.Item1 To range.Item2 - 1  

In this case, it is easier to refactor the sequential loop because there is no need to move local variables. The only difference is that instead of working with the entire source data, it splits it into many independent and potentially parallel partitions. Each one works with a sequential inner loop.

The following call to the Partitioner.Create method defines the partitions as the first parameter for Parallel.ForEach:

Partitioner.Create(1, NUM_AES_KEYS + 1)  

This line splits the range from 1 to NUM_AES_KEYS into many partitions with an upper bound and a lower bound, creating a Tuple(Of Integer, Integer). However, it doesn't specify the number of partitions to create. ParallelPartitionGenerateAESKeys includes a line to write the lower and upper bounds of each generated partition and the actual time when it starts to run the sequential loop for this range.

Debug.WriteLine("Range ({0}, {1}. Time: {2})",                   range.Item1, range.Item2, Now().TimeOfDay)      

Replace the Main subroutine with the following new version, launching first ParallelPartitionGenerateAESKeys and then ParallelParallelGenerateMD5Hashes (code file: Listing05.sln):

Sub Main()      Dim sw = Stopwatch.StartNew()      ParallelPartitionGenerateAESKeys()      ParallelPartitionGenerateMD5Hashes()      Console.WriteLine(sw.Elapsed.ToString())      ' Display the results and wait for the user to press a key      Console.ReadLine()  End Sub  

As shown in the following lines, the partitioner creates 13 ranges. Thus, the Parallel.ForEach will run 13 sequential inner For loops with ranges. However, they don't start at the same time, because that wouldn't be a good idea with four cores available. The parallelized loop tries to load-balance the execution, taking into account the available hardware resources. The highlighted line shows the complexity added by both parallelism and concurrency. If you take into account the time, the first partition that reaches the sequential inner For loop is (66667, 133333) and not (1, 66667). Remember that the upper bound values shown in the following output are exclusive.

Range (133333, 199999. Time: 15:45:38.2205775)  Range (66667, 133333. Time: 15:45:38.2049775)  Range (266665, 333331. Time: 15:45:38.2361775)  Range (199999, 266665. Time: 15:45:38.2205775)  Range (1, 66667. Time: 15:45:38.2205775)  Range (333331, 399997. Time: 15:45:39.0317789)  Range (399997, 466663. Time: 15:45:39.0317789)  Range (466663, 533329. Time: 15:45:39.1097790)  Range (533329, 599995. Time: 15:45:39.2345793)  Range (599995, 666661. Time: 15:45:39.3281794)  Range (666661, 733327. Time: 15:45:39.9365805)  Range (733327, 799993. Time: 15:45:40.0145806)  Range (799993, 800001. Time: 15:45:40.1705809)  

In addition, the order in which the data appears in the debug output is different because there are many concurrent calls to WriteLine. In fact, when measuring speedups, it is very important to comment these lines before the loop begins, because they affect the overall time by generating a bottleneck.

This new version using Parallel.ForEach with custom partitions needs approximately the same time as the previous Parallel.For version to run.

Optimizing Partitions According to Number of Cores

It is possible to tune the generated partitions in order to match them with the number of logical cores found at run time. System.Environment.ProcessorCount offers the number of logical cores or logical processors detected by the operating system. Hence, it is possible to use this value to calculate the desired range size for each partition and use it as a third parameter for the call to Partitioner.Create, using the following formula:

((numberOfElements / numberOfLogicalCores) + 1)

ParallelPartitionGenerateAESKeys can use the following code to create the partitions:

Partitioner.Create(0,       NUM_AES_KEYS,       (CInt(NUM_AES_KEYS / Environment.ProcessorCount) + 1))  

A very similar line can also help to improve ParallelPartitionGenerateMD5Hashes:

Partitioner.Create(1,       NUM_MD5_HASHES,       (CInt(NUM_MD5_HASHES / Environment.ProcessorCount) + 1))  

As shown in the following lines, now the partitioner creates four ranges because the desired range size is CInt((800000 / 4) + 1) = 200001. Thus, the Parallel.ForEach will run four sequential inner For loops with ranges, according to the number of available logical cores.

Range (1, 200002. Time: 16:32:51.3754528)  Range (600004, 800000. Time: 16:32:51.3754528)  Range (400003, 600004. Time: 16:32:51.3754528)  Range (200002, 400003. Time: 16:32:51.3754528)  

Now, ParallelPartitionGenerateAESKeys and ParallelPartitionGenerateMD5Hashes need approximately 3.40 seconds to run, because each one generates as many partitions as cores available and uses a sequential loop in each delegate; therefore, it reduces the previously added overhead. Thus, the speedup achieved is 11 / 3.4 = 3.23x over the sequential version. The reduced overhead makes it possible to reduce the time from 4.1 seconds to 3.4 seconds.


Note
Most of the time, the load-balancing schemes used by TPL under the hood are very efficient. However, you know your designs, code, and algorithms better than TPL at run time. Therefore, considering the capabilities offered by modern hardware architectures and using many of the features included in TPL, you can improve overall performance, reducing unnecessary overhead introduced by the first loop parallelization without the custom partitioner.

The diagram shown in represents one of the possible execution flows with the numbers for the lower and upper bounds for each partition, taking advantage of the four cores with the optimized partitioning scheme.

Lower and upper bounds for each partition when executed on four cores

19.7

Working with IEnumerable Sources of Data

Parallel.ForEach is also useful to refactor existing For Each loops that iterate over a collection that exposes an IEnumerable interface. The simplest definition of the class function Parallel.ForEach, used in the following code (code file: Listing08.sln) to generate a new version of the MD5 hashes generation subroutine, ParallelForEachGenerateMD5Hashes, has the following parameters:

Private Function GenerateMD5InputData() As IEnumerable(Of Integer)      Return Enumerable.Range(1, NUM_AES_KEYS)  End Function    Sub ParallelForEachGenerateMD5Hashes()      Dim sw = Stopwatch.StartNew()      Dim inputData = GenerateMD5InputData()        Parallel.ForEach(inputData, Sub(number As Integer)                       Dim md5M As MD5 = MD5.Create()                       Dim data =                          Encoding.Unicode.GetBytes(number.ToString())                       result = md5M.ComputeHash(data)                       hexString = ConvertToHexString(result)                       ' Console.WriteLine(ConvertToHexString(result))                   End Sub)      Debug.WriteLine("MD5: " + sw.Elapsed.ToString())  End Sub  

The GenerateMD5InputData function returns a sequence of Integer numbers from 1 to NUM_AES_KEYS (inclusive). Instead of using the loop to control the numbers for the iteration, the code in the ParallelForEachGenerateMD5Hashes subroutine saves this sequence in the inputData local variable.

The following line calls Parallel.ForEach with the source (inputData) and a multiline lambda delegate subroutine, receiving the number for each iteration:

Parallel.ForEach(inputData, Sub(number As Integer)  

The line that prepares the input data for the hash computing method also changes to use the value found in number:

Dim data = Encoding.Unicode.GetBytes(number.ToString())  

Note
In this case, performance isn't really good compared with the other versions. However, when each iteration performs time-consuming operations, it would improve performance with an IEnumerable collection. It should be obvious that this isn't an optimal implementation, because the code has to iterate the 900,000 items of a sequence. It does it in parallel, but it takes more time than running loops with less overhead. It also consumes more memory. The example isn't intended to be a best practice for this case. The idea is to understand the different opportunities offered by the Parallel class methods and to be able to evaluate them.

Exiting from Parallel Loops

If you want to interrupt a sequential loop, you can use Exit For or Exit For Each. When working with parallel loops, it requires more complex code, because exiting the delegate body sub or function doesn't have any effect on the parallel loop's execution, as it is the one that's being called on each new iteration. In addition, because it is a delegate, it is disconnected from the traditional loop structure.

The following code (code file: Listing09.sln) shows a new version of the ParallelForEachGenerateMD5Hashes subroutine, called ParallelForEachGenerateMD5HashesBreak. Now, the loopResult local variable saves the result of calling the Parallel.ForEach class function. Moreover, the delegate body subroutine receives a second parameter—a ParallelLoopState instance:

    Dim loopResult = Parallel.ForEach(          inputData,           Sub(number As Integer, loopState As ParallelLoopState)  Private Sub DisplayParallelLoopResult(      ByVal loopResult As ParallelLoopResult)      Dim text As String      If loopResult.IsCompleted Then          text = "The loop ran to completion."      Else          If loopResult.LowestBreakIteration.HasValue = False Then              text = "The loop ended prematurely with a Stop statement."          Else              text = "The loop ended by calling the Break statement."          End If      End If      Console.WriteLine(text)  End Sub    Sub ParallelForEachGenerateMD5HashesBreak()      Dim sw = Stopwatch.StartNew()      Dim inputData = GenerateMD5InputData()        Dim loopResult = Parallel.ForEach(          inputData,           Sub(number As Integer,               loopState As ParallelLoopState)               'If loopState.ShouldExitCurrentIteration Then               '    Exit Sub               'End If               Dim md5M As MD5 = MD5.Create()               Dim data = Encoding.Unicode.GetBytes(number.ToString())               Dim result = md5M.ComputeHash(data)               Dim hexString = ConvertToHexString(result)               If (sw.Elapsed.Seconds > 3) Then                   loopState.Break()                   Exit Sub               End If               ' Console.WriteLine(ConvertToHexString(result))           End Sub)      DisplayParallelLoopResult(loopResult)      Console.WriteLine("MD5: " + sw.Elapsed.ToString())  End Sub    Private Function GenerateMD5InputData() As IEnumerable(Of Integer)      Return Enumerable.Range(1, NUM_AES_KEYS)  End Function  

Understanding ParallelLoopState

The instance of ParallelLoopState (loopState) offers two methods to cease the execution of a Parallel.For or Parallel.ForEach:

1. Break—Communicates that the parallel loop should cease the execution beyond the current iteration, as soon as possible
2. Stop—Communicates that the parallel loop should cease the execution as soon as possible

Note
Using these methods doesn't guarantee that the execution will stop as soon as possible, because parallel loops are complex, and sometimes it is difficult to cease the execution of all the parallel and concurrent iterations. The difference between Break and Stop is that the former tries to cease execution once the current iteration is finished, whereas the latter tries to cease it immediately.

The code shown previously from file Listing09.sln calls the Break method if the elapsed time is more than 3 seconds:

If (sw.Elapsed.Seconds > 3) Then      loopState.Break()      Exit Sub  End If  

It is very important to note that the code in the multiline lambda is accessing the sw variable that is defined in ParallelForEachGenerateMD5HashesBreak. It reads the value of the Seconds read-only property.

It is also possible to check the value of the ShouldExitCurrentIteration read-only property in order to make decisions when the current or other concurrent iterations make requests to stop the parallel loop execution. The code line in code file Listing 09.sln shows a few commented lines that check whether ShouldExitConcurrentIteration is True:

If loopState.ShouldExitCurrentIteration Then     Exit Sub  End If  

If the property is true, then it exits the subroutine, avoiding the execution of unnecessary iterations. The lines are commented because in this case an additional iteration isn't a problem; therefore, it isn't necessary to add this additional instruction to each iteration.

Analyzing the Results of a Parallel Loop Execution

Once the Parallel.ForEach finishes its execution, loopResult has information about the results, in a ParallelLoopResult structure.

The DisplayParallelLoopResult subroutine shown in the code file Listing09.sln receives a ParallelLoopResult structure, evaluates its read-only properties, and outputs the results of executing the Parallel.ForEach loop to the console. explains the three possible results of in this example.

ParallelLoopResult Read-only Properties

Condition Description
IsCompleted = True The loop ran to completion.
IsCompleted = False And LowesBreakIteration.HasValue = False The loop ended prematurely with a Stop statement.
IsCompleted = False And LowesBreakIteration.HasValue = True The loop ended by calling the Break statement. The LowestBreakIteration property holds the value of the lowest iteration that called the Break statement.

Note
It is very important to analyze the results of a parallel loop execution, because continuation with the next statement doesn't mean that it completed all the iterations. Thus, it is necessary to check the values of the ParallelLoopResult properties or to include customized control mechanisms inside the loop bodies. Again, converting sequential code to parallel and concurrent code isn't just replacing a few loops. It is necessary to understand a very different programming paradigm and new structures prepared for this new scenario.

Catching Parallel Loop Exceptions

As many iterations run in parallel, many exceptions can occur in parallel. The classic exception management techniques used in sequential code aren't useful with parallel loops.

When the code inside the delegate that is being called in each parallelized iteration throws an exception that isn't captured inside the delegate, it becomes part of a set of exceptions, handled by the new System.AggregateException class.

You have already learned how to handle exceptions in your sequential code in Chapter 6: “Exception Handling and Debugging.” You can apply almost the same techniques. The only difference is when an exception is thrown inside the loop body, which is a delegate. The following code (code file: Listing10.sln) shows a new version of the ParallelForEachGenerateMD5Hashes subroutine, called ParallelForEachGenerateMD5HashesException. Now, the body throws a TimeOutException if the elapsed time is more than three seconds:

If (sw.Elapsed.Seconds > 3) Then      Throw New TimeoutException(          "Parallel.ForEach is taking more than 3 seconds to complete.")  End If  Sub ParallelForEachGenerateMD5HashesExceptions()      Dim sw = Stopwatch.StartNew()      Dim inputData = GenerateMD5InputData()      Dim loopResult As ParallelLoopResult        Try          loopResult = Parallel.ForEach(inputData,               Sub(number As Integer, loopState As ParallelLoopState)                    'If loopState.ShouldExitCurrentIteration Then                    '    Exit Sub                    'End If                    Dim md5M As MD5 = MD5.Create()                    Dim data = Encoding.Unicode.GetBytes(number.ToString())                    Dim result = md5M.ComputeHash(data)                    Dim hexString = ConvertToHexString(result)                    If (sw.Elapsed.Seconds > 3) Then                        Throw New TimeoutException(  "Parallel.ForEach is taking more than 3 seconds to complete.")                    End If                    ' Console.WriteLine(ConvertToHexString(result))                End Sub)      Catch ex As AggregateException          For Each innerEx As Exception In ex.InnerExceptions              Debug.WriteLine(innerEx.ToString())              ' Do something considering the innerEx Exception          Next      End Try      DisplayParallelLoopResult(loopResult)      Debug.WriteLine("MD5: " + sw.Elapsed.ToString())  End Sub  

A TryCatchEnd Try block encloses the call to Parallel.ForEach. Nevertheless, the line that catches the exceptions is

Catch ex As AggregateException  

instead of the classic

Catch ex As Exception  

An AggregateException contains one or more exceptions that occurred during the execution of parallel and concurrent code. However, this class isn't specifically for parallel computing; it can be used to represent one or more errors that occur during application execution. Therefore, once it is captured, it is possible to iterate through each individual exception contained in the InnerExceptions read-only collection of Exception. In this case, the Parallel.ForEach without the custom partitioner will display the contents of many exceptions. The loop result will look like it was stopped using the Stop keyword. However, as it is possible to catch the AggregateException, you can make decisions based on the problems that made it impossible to complete all the iterations. In this case, a sequential For Each loop retrieves all the information about each Exception in InnerExceptions. The following code (code file: Listing11.sln) shows the information about the first two exceptions converted to a string and sent to the Debug output:

Catch ex As AggregateException      For Each innerEx As Exception In ex.InnerExceptions          Debug.WriteLine(innerEx.ToString())          ' Do something considering the innerEx Exception      Next  End Try  

The output looks like this:

System.TimeoutException: Parallel.ForEach is taking  more than 3 seconds to complete.     at ConsoleApplication3.Module1.  _Closure$____2._Lambda$____9(Int32 number,   ParallelLoopState loopState) in   C:\Users\Public\Documents\ConsoleApplication3\  ConsoleApplication3\Module1.vb:line 255     at System.Threading.Tasks.Parallel.<>c____DisplayClass32`2.  <PartitionerForEachWorker>b____30()     at System.Threading.Tasks.Task.InnerInvoke()     at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)     at System.Threading.Tasks.Task.<>c____DisplayClass7.  <ExecuteSelfReplicating>b____6(Object )  System.TimeoutException: Parallel.ForEach is taking  more than 3 seconds to complete.     at ConsoleApplication3.Module1._Closure$____2.  _Lambda$____9(Int32 number, ParallelLoopState loopState) in   C:\Users\Public\Documents\ConsoleApplication3\  ConsoleApplication3\Module1.vb:line 255     at System.Threading.Tasks.Parallel.<>c____DisplayClass32`2.  <PartitionerForEachWorker>b____30()     at System.Threading.Tasks.Task.InnerInvoke()     at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)     at System.Threading.Tasks.Task.<>c____DisplayClass7.  <ExecuteSelfReplicating>b____6(Object )  

Note
As you can see in the previous lines, the two exceptions display the same information to the Debug output. However, most of the time you will use a more sophisticated exception management technique, and you will provide more information about the iteration that is generating the problem. This example focuses on the differences between an AggregateException and the traditional Exception. It doesn't promote the practice of writing information about errors to the Debug output as a complete exception management technique.

.

Tasks and their relationship with threads

19.8

When you work with tasks, they run their code using underlying threads (software threads, scheduled on certain hardware threads, or logical cores). However, there isn't a one-to-one relationship between tasks and threads. This means you aren't creating a new thread each time you create a new task. The CLR creates the necessary threads to support the tasks' execution needs. Of course, this is a simplified view of what goes on when creating tasks.

Synchronizing code running in multiple threads is indeed complex. Thus, a task-based alternative offers an excellent opportunity to leave some synchronization problems behind, especially those regarding work scheduling mechanisms. The CLR uses work-stealing queues to reduce the locks and to schedule small work chunks without adding a significant overhead. Creating a new thread introduces a big overhead, but creating a new task “steals” work from an existing thread whenever possible. Therefore, tasks offer a new lightweight mechanism for parts of code capable of taking advantage of multiple cores.

The default task scheduler relies on an underlying thread pool engine. Thus, when you create a new task, it will use the steal-working queues to find the most appropriate thread to enqueue it. It steals work from an existing thread or creates a new one when necessary. The code included in tasks will run in one thread, but this happens under the hood, and the overhead is smaller than manually creating a new thread.

System.Theading.Tasks.Task

So far, TPL has been creating instances of System.Threading.Tasks.Task under the hood in order to support the parallel execution of the iterations. In addition, calling Parallel.Invoke also creates as many instances of Task as delegates are called.

A Task represents an asynchronous operation. It offers many methods and properties that enable you to control its execution and get information about its status. The creation of a Task is independent of its execution. This means that you have complete control over the execution of the associated operation.


Note
When you launch many asynchronous operations as Task instances, the task scheduler will try to run them in parallel in order to load-balance all the available logical cores at run time. However, it isn't convenient to use tasks to run any existing piece of code, because tasks add an overhead. Sometimes it doesn't make sense to use tasks. Although this overhead is smaller than that added by a thread, it is still an overhead that has to be considered. For example, it doesn't make sense to create tasks to run two lines of code as two independent asynchronous tasks that solve very simple calculations. Remember to measure the speedups achieved between the parallel execution and the sequential version to decide whether parallelism is appropriate or not.

explains the three possible situations considered in this example.

Task Read-only Properties

Property Description
AsyncState A state Object supplied when you created the Task instance.
CreationOptions The TaskCreationOptions enum value used to provide hints to the task scheduler in order to help it make the best scheduling decisions.
CurrentId The unique ID for the Task being executed. It is not equivalent to a thread ID in unmanaged code.
Exception The AggregateException that caused the Task to end prematurely. It is a null value if the Task hasn't thrown exceptions at all or finished without throwing exceptions.
Factory Provides access to the factory methods that allow the creation of Task instances with and without results
Id The unique ID for the Task instance
IsCanceled A Boolean value indicating whether the Task instance was canceled
IsCompleted A Boolean value indicating whether the Task has completed its execution
IsFaulted A Boolean value indicating whether the Task has aborted its execution due to an unhandled exception
Status The TaskStatus value indicating the current stage in the life cycle of a Task instance

Understanding a Task's Life Cycle

It is very important to understand that each Task instance has a life cycle. However, it represents concurrent code potentially running in parallel according to the possibilities offered by the underlying hardware and the availability of resources at run time. Therefore, any information about the Task instance could change as soon as you retrieve it, because its states are changing concurrently.

A Task instance completes its life cycle just once. After it reaches one of its three possible final states, it doesn't go back to any previous state, as shown in the state diagram in .

Status diagram for a Task instance

19.9

A Task instance has three possible initial states, depending on how it was created, as described in .

Initial States for a Task Instance

Value Description
TaskStatus.Created A Task instance created using the Task constructor has this initial state. It will change once there is a call to either Start or RunSynchronously, or if the task is canceled.
TaskStatus.WaitingForActivation This is the initial state for tasks created through methods that allow the definition of continuations—that is, tasks that aren't scheduled until other dependent tasks finish their execution.
TaskStatus.WaitingToRun This is the initial state for a task created through TaskFactory.StartNew. It is waiting for the specified scheduler to pick it up and run it.

Next, the task status can transition to the TaskStatus.Running state, and finally move to a final state. If it has attached children, it isn't considered complete and will transition to the TaskStatus.WaitingForChildrenToComplete state. Once its children tasks complete, the task moves to one of the three possible final states shown in .

Final States for a Task Instance

Value Description
TaskStatus.Canceled A cancellation request arrived before the task started its execution or during it. The IsCanceled property will be True.
TaskStatus.Faulted An unhandled exception in its body or the bodies of its children made the task end. The IsFaulted property will be True and the Exception property will be non-null and will hold the AggregateException that caused the task or its children to end prematurely.
TaskStatus.RanToCompletion The task completed its execution. It ran to the end of its body without being canceled or throwing an unhandled exception. The IsCompleted property will be True. In addition, IsCanceled and IsFaulted will be both False.

Using Tasks to Parallelize Code

In a previous example, you used Parallel.Invoke to launch two subroutines in parallel:

Parallel.Invoke(Sub() GenerateAESKeys(), Sub() GenerateMD5Hashes())  

It is possible to do the same job using two instances of Task, as shown in the following code (code file: Listing13.sln). Working with instances of Tasks offers more flexibility to schedule and start independent and chained tasks that can take advantage of multiple cores.

’ Create the tasks  Dim t1 = New Task(Sub() GenerateAESKeys())  Dim t2 = New Task(Sub() GenerateMD5Hashes())  ’ Start the tasks  t1.Start()  t2.Start()  ’ Wait for all the tasks to finish  Task.WaitAll(t1, t2)  

The first two lines create two instances of Task with a lambda expression to create a delegate for GenerateAESKeys and GenerateMD5Hashes. t1 is associated with the first subroutine, and t2 with the second. It is also possible to use multiline lambda expression syntax to define the action that the Task constructor receives as a parameter. At this point, the Status for both Task instances is TaskStatus.Created. The subroutines aren't running yet, but the code continues with the next line.

Starting Tasks

Then, the following line starts the asynchronous execution of t1:

t1.Start()  

The Start method initiates the execution of the delegate in an independent way, and the program flow continues with the instruction after this method, even though the delegate has not finished its execution. The code in the delegate associated with the task runs concurrently and potentially in parallel with the main program flow, the main thread. This means that at this point, there is a main thread and another thread or threads supporting the execution of this new task.

The execution of the main program flow, the main thread, is synchronous. This means that it will continue with the next instruction, the line that starts the asynchronous execution of t2:

t2.Start()  

Now the Start method initiates the execution of the delegate in another independent way and the program flow continues with the instruction after this method, even though this other delegate has not finished its execution. The code in the delegate associated with the task runs concurrently and potentially in parallel with the main thread and the code inside GenerateAESKeys that is already running. This means that at this point, there is a main thread and other threads supporting the execution of the two tasks.


Note
It is indeed easy to run asynchronous code using Task instances and the latest language improvements added to Visual Basic. With just a few lines, you can create code that runs asynchronously, control its execution flow, and take advantage of multicore microprocessors or multiple processors. Microsoft .NET Framework 4.5 went a step forward with the addition of the Async and Await modifiers to simplify their usage in asynchronous operations.

The sequence diagram in shows the parallel and asynchronous execution flow for the main thread and the two tasks.

Parallel and asynchronous execution flow for a main thread and two tasks

19.10

Visualizing Tasks Using Parallel Tasks and Parallel Stacks

The Visual Basic 2012 IDE improves the two debugging windows introduced in Visual Basic 2010: Parallel Tasks and Parallel Stacks. These windows offer information about the tasks that are running, including their status and their relationship with the underlying threads. It is very easy to understand the relationship between the tasks and the underlying threads in .NET Framework 4.5 when debugging simple parallelized code and running it step-by-step using the debugging windows, which enable you to monitor what is going on under the hood. By running the code step-by-step, you can see the differences between synchronous and asynchronous execution.

For example, if you insert a breakpoint on the line Task.WaitAll(t1, t2) and your microprocessor has at least two cores, you will be able to see two tasks running in parallel while debugging. To do so, select Debug ⇒ Windows ⇒ Parallel Tasks (Ctrl + Shift + D, K) while debugging the application. The IDE will display the Parallel Tasks dialogue shown in , which includes a list of all the tasks and their status (Active).

Parallel Tasks dialogue with two active tasks

19.11

There are two tasks:

Each of the two tasks is assigned to a different thread. The status for both tasks is Running, and they are identified by an auto-generated lambda name and number, <lambda10>() and <lambda11>(). This happens because the code uses lambda expressions to generate the delegates associated with each task.

If you double-click on a task name, the IDE will display the next statement that is going to run for the selected task. Remember that the threads assigned to these tasks and the main thread are running concurrently and potentially in different logical cores, according to the available hardware resources and the decisions taken by the schedulers.


Note
The CLR task scheduler tries to steal work from the most appropriate underlying thread, by consuming time from an idle one. It can also decide to create a new thread to support the task's execution. The operating system scheduler distributes the cores between the dozens or hundreds of threads scheduled to receive processor time from the available cores. This is why the same code can run with different parallelism levels and different concurrent times on the same hardware configuration.

You can check what is going on with each different concurrent task. You have similar options to those offered by previous Visual Basic versions with threads, but the information is better because you can check whether a task is scheduled or waiting-deadlocked. You can also order and group the information shown in the windows, as you can with any other Visual Basic IDE feature.

The Parallel Tasks grid includes a column named Thread Assignment. This number is the ID shown in the Threads window. Thus, you know which managed thread is supporting the execution of a certain task while debugging. You can also check the next statement and additional detailed information for each different thread. To do so, select Debug ⇒ Windows ⇒ Threads (Ctrl + Alt + H) while debugging the application. The IDE will display the Threads dialog shown in , which includes a list of all the threads, their category, and their locations.

Threads dialog with 5 worker threads and the main thread

19.12

Note
There is a simpler way to visualize the relationship between tasks and threads. You can select Debug → Windows → Parallel Stacks (Ctrl + Shift + D, S). The IDE will display the Parallel Stacks window shown in , which includes a diagram with all the tasks or threads, their status, and their relationships. The default view is Threads.

Parallel Stacks window displaying the default Threads view

19.13

The two threads on the right side of the diagram are running the code scheduled by the two tasks. Each thread shows its call stack. The thread that supports Module1.<lambda10> is running the GenerateAESKeys subroutine — specifically, code inside the call to the ConvertToHexString subroutine. The thread that supports Module1.<lambda11> is running the GenerateMD5Hashes subroutine. This diagram indicates what each thread is doing with a great level of detail.

You can change the value for the combo box in the upper-left corner from Threads to Tasks, and the IDE will display a diagram with all the tasks, including their status, relationships, and the call stack, as shown in .

Parallel Stacks window displaying the Tasks view

19.14

Waiting for Tasks to Finish

At some point, you need to wait for certain tasks, started with an asynchronous execution, to finish. The following line calls the Task.WaitAll method, which will wait for the Task instances received as a ParamArray, separated by commas. This method has a synchronous execution, which means that the main thread won't continue with the next statement until the Task instances received as parameters finish their execution.

Task.WaitAll(t1, t2)  

Here, t1 and t2 have to finish their execution. The current thread—in this case, the main thread—will wait until both tasks finish their execution. However, it is very important that this time waiting for the tasks to finish is not a loop continuously checking a status and consuming a lot of CPU cycles. The WaitAll method uses a lightweight mechanism to reduce the need for CPU cycles as much as possible. This way, once these tasks finish their execution, the next statement will run.

Because the WaitAll method uses a synchronous execution, if the tasks take one minute to run, then the thread where this method was called (in this case, the main thread) will be waiting for this amount of time. Therefore, sometimes you want to limit the number of milliseconds to wait for the tasks to finish. You can use another definition for the Task.WaitAll method that accepts an array of Task instances and the number of milliseconds to wait. The method returns a Boolean value indicating whether the tasks where able to finish within the specified timeout. The following code waits for t1 and t2 to finish their execution with a three-second timeout (code file: Snippet03.sln):

If Task.WaitAll(New Task() {t1, t2}, 3000) = False Then      Console.WriteLine(  "GenerateAESKeys and GenerateMD5Hashes are taking   more than 3 seconds to complete.")      Console.WriteLine(t1.Status.ToString())      Console.WriteLine(t2.Status.ToString())  End If  

If t1 and t2 don't finish in three seconds, the code displays a message and the status for both tasks. If no exceptions occurred in the code for these tasks, they could be still running. The Task.WaitAll method with a specific timeout doesn't cancel the tasks if they take more time to run; it just returns from its synchronous execution with the Boolean result.

It is also possible to call the Wait method for a Task instance. In this case, the current thread will wait until that task finishes its execution. Of course, there is no need to send the task instance as a parameter because the Wait method is an instance method. The Task.Wait method also supports a timeout in one of its definitions. The following code waits for t1 to finish, and if it doesn't complete its work in three seconds, it displays a message and its status (code file: Snippet04.sln):

If t1.Wait (3000) = False Then      Console.WriteLine("GenerateAESKeys is taking more than 3 seconds to complete.")      Console.WriteLine(t1.Status.ToString())  End If  

Cancelling Tasks Using Tokens

You can interrupt the execution of Task instances through the use of cancellation tokens. To do so, it is necessary to add some code in the delegate, in order to create an cancelable operation that is capable of terminating in a timely manner.

The following code (code file: Listing14.sln) shows two new versions of the AES keys and MD5 hash generators. The changes made in order to support cancellation appear in bold. The new GenerateAESKeysCancel, replacing the old GenerateAESKeys, receives a System.Threading.CancellationToken instance and throws an OperationCanceledException calling the ThrowIfCancellationRequested method. This way, the Task instance transitions to the TaskStatus.Canceled state and the IsCanceled property will be True.

Private Sub GenerateAESKeysCancel(ByVal ct As System.Threading.CancellationToken)      ct.ThrowIfCancellationRequested()      Dim sw = Stopwatch.StartNew()      Dim aesM As New AesManaged()      For i As Integer = 1 To NUM_AES_KEYS          aesM.GenerateKey()          Dim result = aesM.Key          Dim hexString = ConvertToHexString(result)          ' Console.WriteLine("AES: " + ConvertToHexString(result))          If ct.IsCancellationRequested Then              ct.ThrowIfCancellationRequested()          End If      Next      Console.WriteLine("AES: " + sw.Elapsed.ToString())  End Sub    Private Sub GenerateMD5HashesCancel(ByVal ct As System.Threading.CancellationToken)      ct.ThrowIfCancellationRequested()      Dim sw = Stopwatch.StartNew()      Dim md5M As MD5 = MD5.Create()      For i As Integer = 1 To NUM_MD5_HASHES          Dim data = Encoding.Unicode.GetBytes(i.ToString())          Dim result = md5M.ComputeHash(data)          Dim hexString = ConvertToHexString(result)          ' Console.WriteLine(ConvertToHexString(result))          If ct.IsCancellationRequested Then              ct.ThrowIfCancellationRequested()          End If      Next      Debug.WriteLine("MD5: " + sw.Elapsed.ToString())  End Sub    Sub Main()      Dim cts As New System.Threading.CancellationTokenSource()      Dim ct As System.Threading.CancellationToken = cts.Token        Dim t1 = Task.Factory.StartNew(Sub() GenerateAESKeysCancel(ct), ct)      Dim t2 = Task.Factory.StartNew(Sub() GenerateMD5HashesCancel(ct), ct)        ' Sleep the main thread for 1 second      Threading.Thread.Sleep(1000)        cts.Cancel()        Try          If Task.WaitAll(New Task() {t1, t2}, 1000) = False Then              Console.WriteLine(  "GenerateAESKeys and GenerateMD5Hashes are taking   more than 1 second to complete.")              Console.WriteLine(t1.Status.ToString())              Console.WriteLine(t2.Status.ToString())          End If      Catch ex As AggregateException          For Each innerEx As Exception In ex.InnerExceptions              Debug.WriteLine(innerEx.ToString())              ' Do something else considering the innerEx Exception          Next      End Try      If t1.IsCanceled Then          Console.WriteLine("The task running GenerateAESKeysCancel was canceled.")      End If      If t2.IsCanceled Then          Console.WriteLine(  "The task running GenerateMD5HashesCancel was canceled.")      End If          ' Display the results and wait for the user to press a key          Console.ReadLine()  End Sub  

The first line of GenerateAESKeysCancel will throw the aforementioned exception if its cancellation was already requested at that time. This way, it won't start the loop if unnecessary at that point.

ct.ThrowIfCancellationRequested()  

In addition, after each iteration of the loop, new code checks the token's IsCancellationRequested. If it is True, it calls the ThrowIfCancellationRequested method. Before calling this method, when IsCancellationRequested is True, it is possible to add cleanup code when necessary:

If ct.IsCancellationRequested Then      ' It is important to add cleanup code here when necessary      ct.ThrowIfCancellationRequested()  End If  

Doing this adds a small amount of overhead to each iteration of the loop, but it is capable of observing an OperationCanceledException and compares its token to the Task instance's associated one. If they are the same and its IsCancelled property is True, the Task instance understands that there is a request for cancellation and makes the transition to the Canceled state, interrupting its execution. When there is code waiting for the canceled Task instance, this also generates an automatic TaskCanceledException, which is wrapped in an AggregateException.

In this case, the main subroutine creates a CancellationTokenSource, cts, and a Cancellation Token, ct:

    Dim cts As New System.Threading.CancellationTokenSource()      Dim ct As System.Threading.CancellationToken = cts.Token  

CancellationTokenSource is capable of initiating cancellation requests, and CancellationToken communicates it to asynchronous operations.

It is necessary to send a CancellationToken as a parameter to each task delegate; therefore, the code uses one of the definitions of the TaskFactory.StartNew method. The following lines create and start two Task instances with associated actions and the same CancellationToken instance (ct) as parameters:

    Dim t1 = Task.Factory.StartNew(Sub() GenerateAESKeysCancel(ct), ct)      Dim t2 = Task.Factory.StartNew(Sub() GenerateMD5HashesCancel(ct), ct)  

The preceding lines use the Task class Factory property to retrieve a TaskFactory instance that can be used to create tasks with more options than those offered by direct instantiation of the Task class. In this case, it uses the StartNew method, which is functionally equivalent to creating a Task using one of its constructors and then calling Start to schedule it for execution.

Then, the code calls the Sleep method to make the main thread sleep for one second. This method suspends the current thread for the indicated time—in this case, specified as an Integer in milliseconds:

Threading.Thread.Sleep(1000)  

Note
The main thread remains suspended for one second, but the threads that are supporting the tasks' execution won't be suspended. Therefore, the tasks will be scheduled to begin their execution.

One second later, the main thread communicates a request for cancellation for both tasks through the CancellationTokenSource instance's Cancel method:

cts.Cancel()  

The cancellation token is evaluated in the two delegates launched by the Task instances, as previously explained.

Adding a few lines, it is indeed easy to cancel asynchronous actions. However, it is very important to add the necessary cleanup code.

A TryCatchEnd Try block encloses the call to Task.WaitAll. Because there was a request for cancellation for both tasks, there will be two benign exceptions of type OperationCanceledException.

The IsCanceled property for both tasks is going to be True. Checking this property, you can add code whenever a task was canceled.

Handling Exceptions Thrown by Tasks

As many tasks run in parallel, many exceptions can occur in parallel. Task instances also work with a set of exceptions, handled by the previously explained System.AggregateException class.

The following code (code file: Listing15.sln) shows the highlighted lines that add an unhandled exception in the GenerateAESKeysCancel subroutine.

Comment the code that requested cancellation for both tasks:

’ cts.Cancel()  Private Sub GenerateAESKeysCancel(ByVal ct As System.Threading.CancellationToken)      ct.ThrowIfCancellationRequested()      Dim sw = Stopwatch.StartNew()      Dim aesM As New AesManaged()      For i As Integer = 1 To NUM_AES_KEYS          aesM.GenerateKey()          Dim result = aesM.Key          Dim hexString = ConvertToHexString(result)          ' Console.WriteLine("AES: " + ConvertToHexString(result))          If (sw.Elapsed.Seconds > 0.5) Then              Throw New TimeoutException(  "GenerateAESKeysCancel is taking  more than 0.5 seconds to complete.")          End If          If ct.IsCancellationRequested Then              ct.ThrowIfCancellationRequested()          End If      Next      Console.WriteLine("AES: " + sw.Elapsed.ToString())  End Sub  

Add the following lines to the Main subroutine:

If t1.IsFaulted Then      For Each innerEx As Exception In t1.Exception.InnerExceptions          Debug.WriteLine(innerEx.ToString())          ' Do something else considering the innerEx Exception      Next  End If  

Because there is an unhandled exception in t1, its IsFaulted property is True. Therefore, t1.Exception, an AggregateException, contains one or more exceptions that occurred during the execution of its associated delegate. After checking the IsFaulted property, it is possible to iterate through each individual exception contained in the InnerExceptions read-only collection of Exception. You can make decisions according to the problems that made it impossible to complete the task. The following lines show the information about the unhandled exception converted to a string and sent to the Debug output.

System.TimeoutException: GenerateAESKeysCancel  is taking more than 0.5 seconds to complete.     at ConsoleApplication3.Module1.GenerateAESKeysCancel(CancellationToken ct)   in C:\Wrox\Professional_VB_2012\ConsoleApplication3\  ConsoleApplication3\Module1.vb:line 427     at ConsoleApplication3.Module1._Closure$____3._Lambda$____11()   in C:\Wrox\Professional_VB_2012\ConsoleApplication3\  ConsoleApplication3\Module1.vb:line 337     at System.Threading.Tasks.Task.InnerInvoke()     at System.Threading.Tasks.Task.Execute()  

Note
Unhandled exceptions inside asynchronous operations are usually complex problems, because sometimes you need to perform important cleanup operations. For example, when an exception occurs, you can have partial results, and you could have to remove these values if the job doesn't complete because of an exception. Thus, you have to consider cleanup operations when working with tasks.

Returning Values from Tasks

So far, task instances did not return values; they were delegates running subroutines. However, it is also possible to return values from tasks, invoking functions and using Task(Of TResult) instances, where TResult has to be replaced by the returned type.

The following code (code file: Listing17.sln) shows the code for a new function that generates the well-known AES keys and then returns a list of the ones that begin with the character prefix received as one of the parameters (prefix). GenerateAESKeysWithCharPrefix returns a List of String.

The Main subroutine uses the definition of the TaskFactory.StartNew method, but this time it calls it from a Task(Of TResult) instance and not a Task instance. Specifically, it creates a Task(Of List(Of String)) instance, sending it a CancellationToken as a parameter to the task delegate:

Dim t1 = Task(Of List(Of String)).Factory.StartNew(      Function() GenerateAESKeysWithCharPrefix(ct, "A"), ct)  

The delegate is a function that returns a List(Of String), which is going to be available in the Task(Of Result) instance (t1) through its Result property after the associated delegate completes its execution and the function returns a value.

The main thread waits for t1 to finish and then checks whether it completed its execution, checking the previously explained Task instance properties. Then, it iterates through each string in the list, returned by the function called in the previous task, and displays the results on the console. It does this job running a new asynchronous task, t2 (code file: Listing17.sln).

Private Function GenerateAESKeysWithCharPrefix(      ByVal ct As System.Threading.CancellationToken,       ByVal prefix As Char) As List(Of String)        ct.ThrowIfCancellationRequested()      Dim sw = Stopwatch.StartNew()      Dim aesM As New AesManaged()      Dim keysList As New List(Of String)      For i As Integer = 1 To NUM_AES_KEYS          aesM.GenerateKey()          Dim result = aesM.Key          Dim hexString = ConvertToHexString(result)          If Left(hexString, 1) = prefix Then              keysList.Add(hexString)          End If          If ct.IsCancellationRequested Then              ' It is important to add cleanup code here              ct.ThrowIfCancellationRequested()          End If      Next      Return keysList      Console.WriteLine("AES: " + sw.Elapsed.ToString())  End Function    Sub Main()      Dim sw = Stopwatch.StartNew()      Dim cts As New System.Threading.CancellationTokenSource()      Dim ct As System.Threading.CancellationToken = cts.Token        Dim t1 = Task(Of List(Of String)).Factory.StartNew(          Function() GenerateAESKeysWithCharPrefix(ct, "A"), ct)        Try          t1.Wait()      Catch ex As AggregateException          For Each innerEx As Exception In ex.InnerExceptions              Debug.WriteLine(innerEx.ToString())              ' Do something else considering the innerEx Exception          Next      End Try      If t1.IsCanceled Then          Console.WriteLine(  "The task running GenerateAESKeysWithCharPrefix was canceled.")          Exit Sub      End If      If t1.IsFaulted Then          For Each innerEx As Exception In t1.Exception.InnerExceptions              Debug.WriteLine(innerEx.ToString())              ' Do something else considering the innerEx Exception          Next          Exit Sub      End If        Dim t2 = Task.Factory.StartNew(Sub()           ' Do something with the result           ' returned by the task's delegate           For i As Integer = 0 To t1.Result.Count - 1               Console.WriteLine(t1.Result(i))           Next       End Sub, TaskCreationOptions.LongRunning)        ' Wait for the user to press a key while t2 is displaying the results      Console.ReadLine()  End Sub  

TaskCreationOptions

The code creates and starts the second task, t2, using the StarNew method and multiline lambda expression syntax. However, in this case, it uses a different definition that receives a TaskCreationOptions parameter that specifies flags to control optional behavior for the creation, scheduling, and execution of tasks.

The TaskCreationOptions enumeration has the four members described in .

Optional Behaviors for Tasks

Value Description
TaskCreationOptions.AttachedToParent The task is attached to a parent task. You can create tasks inside other tasks.
TaskCreationOptions.None The task can use the default behavior.
TaskCreationOptions.LongRunning The task will take a long time to run. Therefore, the scheduler can work with it as a coarse-grained operation. You can use this option if the task is likely to take many seconds to run. It is not advisable to use this option when a task takes less than one second to run.
TaskCreationOptions.PreferFairness This option tells the scheduler that tasks scheduled sooner should be run sooner and tasks scheduled later should be run later.

Note
It is possible to combine multiple TaskCreationOptions enum values using bitwise operations.

Chaining Two Tasks Using Continuations

Clearly, the previous case shows an example of chained tasks. Task t1 produces a result, and t2 needs it as an input in order to start processing it. In these cases, instead of adding many lines that check for the successful completion of a precedent task and then schedule a new task, TPL enables you to chain tasks using continuations.

You can call the ContinueWith method for any task instance and create a continuation that executes when this task successfully completes its execution. It has many definitions, the simplest of which defines an action as done when creating Task instances.

The following lines show a simplified version of the code used in the previous example to display the results generated by t1 (code file: Snippet05.sln):

Dim t1 = Task(Of List(Of String)).Factory.StartNew(      Function() GenerateAESKeysWithCharPrefix(ct, "A"), ct)    Dim t2 = t1.ContinueWith(Sub(t)                               ' Do something with the result returned                                ' by the task's delegate                               For i As Integer = 0 To t.Result.Count - 1                                   Console.WriteLine(t.Result(i))                               Next                           End Sub)  

It is possible to chain many tasks and then wait for the last task to be executed. However, you have to be careful with the continuous changes in the states when checking their values for all these asynchronous operations. In addition, it is very important to consider all the potential exceptions that could be thrown.

Preparing the Code for Parallelism

Parallel programming applied to certain complex algorithms is not as simple as shown in the previously explained examples. Sometimes, the differences between a reliable and bug-free parallelized version and its sequential counterpart could reveal an initially unexpected complexity. The code can become too complex, even when taking advantage of the new features offered by TPL. In fact, a complex sequential algorithm is probably going to be a more complex parallel algorithm. Therefore, TPL offers many new data structures for parallel programming that simplify many complex synchronization problems:

The aforementioned data structures were designed to avoid locks wherever possible, and use fine-grained locking when they are necessary on their different shared resources. Locks generate many potential bugs and can significantly reduce scalability. However, sometimes they are necessary because writing lock-free code isn't always possible.

These new data structures enable you to forget about complex lock mechanisms in certain situations because they already include all the necessary lightweight synchronization under the hood. Therefore, it is a good idea to use these data structures whenever possible.

Synchronization Primitives

Furthermore, .NET Framework 4.5 offers synchronization primitives for managing and controlling the interactions between different tasks and their underlying threads, including the following operations:

Synchronization Problems

The aforementioned synchronization primitives are advanced topics that require an in-depth analysis in order to determine the most convenient primitive to apply in a given situation. Nowadays, it is important to use the right synchronization primitive in order to avoid potential pitfalls, explained in the following list, while still keeping the code scalable.

Many techniques and new debugging tools can simplify the most complex problems, such as the following:

Understanding Concurrent Collection Features

Lists, collections, and arrays are excellent examples of when complex synchronization management is needed to access them concurrently and in parallel. If you have to write a parallel loop that adds elements in an unordered way into a shared collection, you have to add a synchronization mechanism to generate a thread-safe collection. The classic lists, collections, and arrays are not thread-safe because they aren't prepared to receive concurrent instructions to add or remove elements. Therefore, creating a thread-safe collection is indeed a very complex job.

Systems.Collections.Concurrent

Luckily, TPL offers a new namespace, System.Collections.Concurrent, for dealing with thread-safe issues. As previously explained, this namespace provides access to the custom partitioners for parallelized loops. However, it also offers access to the following collections prepared for concurrency:


Note
You don't have to worry about locks and synchronization primitives while using the aforementioned collections in many tasks because they are already prepared to receive concurrent and parallel methods calls. They solve potential deadlocks and race conditions, and they make it easier to work with parallelized code in many advanced scenarios.

ConcurrentQueue

It would be difficult to use a classic shared list to add elements from many independent tasks created by the Parallel.ForEach method. You would need to add synchronization code, which would be a great challenge without restricting the overall scalability. However, it is possible to add strings to a queue (enqueue strings) in a shared ConcurrentCollection inside the parallelized code, because it is prepared for adding elements concurrently.

The following code (code file Listing18.sln) uses a shared ConcurrentQueue(Of String), Keys, in order to hold the strings that contain the AES keys that begin with a certain prefix, generated in a parallelized loop with the custom partitioner. All the tasks created automatically by Parallel.ForEach are going to call the Enqueue method to add the elements that comply with the condition.

Keys.Enqueue(hexString)  

It is indeed simple to work with a ConcurrentQueue. There is no need to worry about synchronization problems because everything is controlled under the hood.

Private Keys As Concurrent.ConcurrentQueue(Of String)    Private Sub ParallelPartitionGenerateAESKeysWCP(      ByVal ct As System.Threading.CancellationToken,       ByVal prefix As Char)      ct.ThrowIfCancellationRequested()      Dim sw = Stopwatch.StartNew()      Dim parallelOptions As New ParallelOptions()      ' Set the CancellationToken for the ParallelOptions instance      parallelOptions.CancellationToken = ct      Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), parallelOptions,          Sub(range)              Dim aesM As New AesManaged()              'Debug.WriteLine("Range ({0}, {1}. Time: {2})",              '                range.Item1, range.Item2, Now().TimeOfDay)              For i As Integer = range.Item1 To range.Item2 - 1                  aesM.GenerateKey()                  Dim result = aesM.Key                  Dim hexString = ConvertToHexString(result)                  ' Console.WriteLine("AES: " + ConvertToHexString(result))                  If Left(hexString, 1) = prefix Then                      Keys.Enqueue(hexString)                  End If                  parallelOptions.CancellationToken.ThrowIfCancellationRequested()              Next          End Sub)      Console.WriteLine("AES: " + sw.Elapsed.ToString())  End Sub    Sub Main()      Dim cts As New System.Threading.CancellationTokenSource()      Dim ct As System.Threading.CancellationToken = cts.Token      Keys = New ConcurrentQueue(Of String)        Dim tAsync = New Task(Sub() ParallelPartitionGenerateAESKeysWCP(ct, "A"))      tAsync.Start()        ' Do something else      ' Wait for tAsync to finish      tAsync.Wait()        Console.ReadLine()  End Sub  

For example, it is possible to run many LINQ queries to display partial statistics while running the task that is adding elements to the ConcurrentQueue (Keys). The following code (code file: Listing19.sln) shows a new Main subroutine that checks whether the task (tAsync) is running or waiting to run, and while this happens it runs a LINQ query to show the number of keys that contain an F in the shared ConcurrentQueue (Keys).

Sub Main()      Dim cts As New System.Threading.CancellationTokenSource()      Dim ct As System.Threading.CancellationToken = cts.Token        Keys = New ConcurrentQueue(Of String)      Dim tAsync = Task.Factory.StartNew(          Sub() ParallelPartitionGenerateAESKeysWCP(ct, "A"))        Do While (tAsync.Status = TaskStatus.Running) Or               (tAsync.Status = TaskStatus.WaitingToRun)          ' Display partial results          Dim countQuery = Aggregate key In Keys                           Where key.Contains("F")                           Into Count()            Console.WriteLine("So far, the number of keys  that contain an F is: {0}", countQuery)          ' Sleep the main thread for 0.5 seconds          Threading.Thread.Sleep(500)      Loop        tAsync.Wait()        ' Do something else        Console.ReadLine()  End Sub  

Another useful feature is the capability to remove an element at the beginning of the queue in a safe way using its TryDequeue method:

Dim firstKey As String  If Keys.TryDequeue(firstKey) Then      ' firstKey has the first key added to the ConcurrentQueue  Else      ' It wasn't possible to remove an element from the ConcurrentQueue  End If  

TryDequeue returns a Boolean value indicating whether the operation was successful. It returns the element using an output attribute—in this case, a String received by reference (firstKey).

It is possible to add and remove elements in different tasks.

ConcurrentStack

ConcurrentStack is very similar to the previously explained ConcurrentQueue but it uses different method names to better represent a stack (a LIFO collection). Its most important methods are Push and TryPop.

Push inserts an element at the top of the ConcurrentStack. If Keys were a ConcurrentStack(Of String), the following lines would add hexString at the top of the stack:

If Left(hexString, 1) = prefix Then      Keys.Push(hexString)  End If  

You can remove an element at the top of the stack in a safe way using its TryPop method. However, in this case, the method will return the last element added because it is a stack and not a queue:

Dim firstKey As String  If Keys.TryPop(firstKey) Then      ' firstKey has the last key added to the ConcurrentStack  Else      ' It wasn't possible to remove an element from the ConcurrentStack  End If  

TryPop also returns a Boolean value indicating whether the operation was successful.

Transforming LINQ into PLINQ

You already learned that LINQ is very useful to query and process different data sources. If you are using LINQ to Objects, it is possible to take advantage of parallelism using its parallel implementation, Parallel LINQ (PLINQ).


Note
PLINQ implements the full set of LINQ query operators and adds new additional operators for parallel execution. PLINQ can achieve significant speedups over its LINQ counterpart, but it depends on the scenario, as always with parallelism. If the query involves an appreciable number of calculations and memory-intensive operations and ordering doesn't matter, the speedups could be significant. However, when ordering matters, the speedups could be reduced.

As you might have expected, LINQ and PLINQ can work with the previously explained concurrent collections. The following code defines a simple but intensive function to count and return the number of letters in a string received as a parameter (code file: Snippet06.sln):

Function CountLetters(ByVal key As String) As Integer      Dim letters As Integer = 0      For i As Integer = 0 To key.Length() - 1          If Char.IsLetter(key, i) Then letters += 1      Next      Return letters  End Function  

A simple LINQ expression to return all the AES keys with at least 10 letters containing an A, an F, a 9, and not a B, would look like the following:

Dim keysWith10Letters = From key In Keys                          Where CountLetters(key) >= 10                           And key.Contains("A")                           And key.Contains("F")                           And key.Contains("9")                           And Not key.Contains("B")  

In order to transform the aforementioned LINQ expression into a PLINQ expression that can take advantage of parallelism, it is necessary to use the AsParallel method, as shown here:

Dim keysWith10Letters = From key In Keys.AsParallel()                          Where CountLetters(key) >= 10                           And key.Contains("A")                           And key.Contains("F")                           And key.Contains("9")                           And Not key.Contains("B")  

This way, the query will try to take advantage of all the available logical cores at run time in order to run faster than its sequential version.

It is possible to add code at the end of the Main subroutine to return some results according to the PLINQ query (code file: Snippet06.sln):

Dim sw = Stopwatch.StartNew()    Dim keysWith10Letters = From key In Keys.AsParallel()                          Where CountLetters(key) >= 10                           And key.Contains("A")                           And key.Contains("F")                           And key.Contains("9")                           And Not key.Contains("B")    Console.WriteLine("The code generated {0} keys   with at least ten letters, A, F and 9   but no B in the hexadecimal code.", keysWith10Letters.Count())  Console.WriteLine("First key {0}: ", keysWith10Letters(0))  Console.WriteLine("Last key {0}: ",       keysWith10Letters(keysWith10Letters.Count() - 1))  Debug.WriteLine(sw.Elapsed.ToString())    Console.ReadLine()  

This code shows the number of keys that comply with the conditions, the first one and the last one, stored in the results of the PLINQ query that worked against the ConcurrentQueue(Of String).

ParallelEnumerable and Its AsParallel Method

The System.Linq.ParallelEnumerable class is responsible for exposing most of PLINQ's additional functionality, including its most important one: the AsParallel method. summarizes the PLINQ-specific methods.

PLINQ Operators Exposed by ParallelEnumerable

Value Description
AsOrdered() PLINQ must preserve the ordering of the source sequence for the rest of the query or until it changes using an Order By clause.
AsParallel() The rest of the query should be parallelized, whenever possible.
AsSequential() The rest of the query should run sequentially, as traditional LINQ.
AsUnordered() PLINQ doesn't have to preserve the ordering of the source sequence.
ForAll() An enumeration method that enables the results to be processed in parallel, using multiple tasks.
WithCancellation Enables working with a cancellation token to permit cancellation of the query execution as previously explained with tasks.
WithDegreeOfParallelism PLINQ will be optimized as if the total number of available cores were equal to the degree of parallelism specified as a parameter for this method.
WithExecutionMode This can force parallel execution when the default behavior would be to run it sequentially as traditional LINQ.
WithMergeOptions This can provide hints about the way PLINQ should merge the parallel pieces of the result on the thread that is consuming the query.

In addition, AsParallel offers an Aggregate overload that enables the implementation of parallel reduction algorithms. It enables intermediate aggregation on each parallelized part of the query and a final aggregation function that is capable of providing the logic to combine the results of all the generated partitions.

Sometimes it is useful to run a PLINQ query with many different degrees of parallelism in order to measure its scalability. For example, the following line runs the previously shown PLINQ query to take advantage of no more than three cores:

Dim keysWith10Letters = From key In Keys.AsParallel().WithDegreeOfParallelism(3)                          Where CountLetters(key) >= 10                           And key.Contains("A")                          And key.Contains("F")                           And key.Contains("9")                           And Not key.Contains("B")  

AsOrdered and Order By

Because using AsOrdered and the Order By clause in PLINQ queries can reduce any speed gains, it is very important to compare the speedup achieved against the sequential version before requesting ordered results.

If a PLINQ query doesn't achieve significant performance improvements, you have another interesting option to take advantage of parallelism: running many LINQ queries in independent tasks or using Parallel.Invoke.

Working with ForAll and a ConcurrentBag

The ForAll extension method is very useful to process the results of a query in parallel without having to write a parallel loop. It receives an action as a parameter, offering the same possibilities as the same parameter received by the Task constructors. Therefore, using lambda expressions, you can combine parallelized processing actions from the results of a PLINQ query. The following lines add elements in parallel to a new ConcurrentBag (keysBag), an unordered collection of Integer, counting the letters for each of the keys in the results of the previous PLINQ query (code file: Snippet07.sln):

Dim keysWith10Letters = From key In Keys.AsParallel()                          Where CountLetters(key) >= 10                           And key.Contains("A")                           And key.Contains("F")                           And key.Contains("9")                           And Not key.Contains("B")    Dim keysBag As New ConcurrentBag(Of Integer)  keysWith10Letters.ForAll(Sub(i) keysBag.Add(CountLetters(i)))  

Note
This parallel processing is possible because ConcurrentBag is one of the concurrent collections that allows many elements to be added by multiple tasks running in parallel.

Summary

This chapter provided an overview of the task-based programming model introduced with .NET Framework 4 and improved in .NET Framework 4.5. The chapter introduced some of its classes, structures, and enumerations. In order to help you tackle the multicore revolution, it also explained several related concepts used in basic concurrent and parallel programming designs, including the following key points:

The next chapter will dive deep into the different deployment options for .NET applications. You will learn to easily select the most convenient deployment mechanism according to the kind of application you have developed and your target environments.

Назад: Chapter 18: Security in the .NET Framework
Дальше: Chapter 20: Deploying XAML Applications via the Window 8 Windows Store