Книга: Конкурентность в C#. Асинхронное, параллельное и многопоточное программирование. 2-е межд. изд.
Назад: Глава 3. Асинхронные потоки
Дальше: Глава 5. Основы Dataflow

Глава 4. Основы параллельного программирования

В этой главе представлены паттерны параллельного программирования. Параллельное программирование используется для разбиения блоков работы, ограниченных по вычислениям, и распределения их между несколькими потоками. Рецепты параллельной обработки ориентируются только на счетную работу. Если у вас имеются операции, асинхронные по своей природе (например, работа, связанная с вводом/выводом), которые должны выполняться параллельно, обращайтесь к главе 2 и рецепту 2.4.

Абстракции параллельной обработки, рассмотренные в этой главе, являются частью библиотеки TPL (Task Parallel Library). Библиотека TPL является частью фреймворка .NET.

4.1. Параллельная обработка данных

Задача

Имеется коллекция данных. Требуется выполнить одну и ту же операцию с каждым элементом данных. Эта операция является ограниченной по вычислениям и может занять некоторое время.

Решение

Тип Parallel содержит метод ForEach, разработанный специально для этой задачи. Следующий пример получает коллекцию матриц и поворачивает эти матрицы:

void RotateMatrices(IEnumerable<Matrix> matrices, float degrees)

{

  Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees));

}

Возможны ситуации, в которых преждевременно требуется прервать цикл (например, при обнаружении недействительного значения). Следующий пример обращает каждую матрицу, но при обнаружении недействительной матрицы цикл будет прерван:

void InvertMatrices(IEnumerable<Matrix> matrices)

{

  Parallel.ForEach(matrices, (matrix, state) =>

  {

    if (!matrix.IsInvertible)

      state.Stop();

    else

      matrix.Invert();

  });

}

Этот код использует ParallelLoopState.Stop для остановки цикла и предотвращения любых дальнейших вызовов тела цикла. Учтите, что цикл является параллельным, поэтому могут уже выполняться другие вызовы тела цикла, включая вызовы для элементов, следующих после текущего. В приведенном примере кода если третья матрица не является обратимой, то цикл прерывается и новые матрицы обрабатываться не будут, но может оказаться, что уже обрабатываются другие матрицы (например, четвертая и пятая).

Более распространенная ситуация встречается тогда, когда требуется отменить параллельный цикл. Это не то же, что остановка цикла; цикл останавливается изнутри и отменяется за своими пределами. Например, кнопка отмены может отменить CancellationTokenSource, отменяя параллельный цикл, как в следующем примере:

void RotateMatrices(IEnumerable<Matrix> matrices, float degrees,

    CancellationToken token)

{

  Parallel.ForEach(matrices,

      new ParallelOptions { CancellationToken = token },

      matrix => matrix.Rotate(degrees));

}

Следует иметь в виду, что каждая параллельная задача может выполняться в другом потоке, поэтому любое совместное состояние должно быть защищено. Следующий пример обращает каждую матрицу и подсчитывает количество матриц, которые обратить не удалось:

// Примечание: это не самая эффективная реализация.

// Это всего лишь пример использования блокировки

// для защиты совместного состояния.

int InvertMatrices(IEnumerable<Matrix> matrices)

{

  object mutex = new object();

  int nonInvertibleCount = 0;

  Parallel.ForEach(matrices, matrix =>

  {

    if (matrix.IsInvertible)

    {

      matrix.Invert();

    }

    else

    {

      lock (mutex)

      {

        ++nonInvertibleCount;

      }

    }

  });

  return nonInvertibleCount;

}

Пояснение

Метод Parallel.ForEach предоставляет возможность параллельной обработки для последовательности значений. Аналогичное решение Parallel LINQ (PLINQ) предоставляет практически те же возможности в LINQ-подобном синтаксисе. Одно из различий между Parallel и PLINQ заключается в том, что PLINQ предполагает, что может использовать все ядра на компьютере, тогда как Parallel может динамически реагировать на изменения условий процессора.

Parallel.ForEach реализует параллельный цикл foreach. Если вам потребуется выполнить параллельный цикл for, то класс Parallel также поддерживает метод Parallel.For. Метод Parallel.For особенно полезен при работе с несколькими массивами данных, которые получают один индекс.

Дополнительная информация

В рецепте 4.2 рассматривается параллельное агрегирование серий значений, включая суммирование и вычисление средних значений.

В рецепте 4.5 рассматриваются основы PLINQ.

В главе 10 рассматривается отмена.

4.2. Параллельное агрегирование

Задача

Требуется агрегировать результаты при завершении параллельной операции (примеры агрегирования — суммирование значений или вычисление среднего).

Решение

Для поддержки агрегирования класс Parallel использует концепцию локальных значений — переменных, существующих локально внутри параллельного цикла. Это означает, что тело цикла может просто обратиться к значению напрямую, без необходимости синхронизации. Когда цикл готов к агрегированию всех своих локальных результатов, он делает это с помощью делегата localFinally. Следует отметить, что делегату localFinally не нужно синхронизировать доступ к переменной для хранения результата. Пример параллельного суммирования:

// Примечание: это не самая эффективная реализация.

// Это всего лишь пример использования блокировки

// для защиты совместного состояния.

int ParallelSum(IEnumerable<int> values)

{

  object mutex = new object();

  int result = 0;

  Parallel.ForEach(source: values,

      localInit: () => 0,

      body: (item, state, localValue) => localValue + item,

      localFinally: localValue =>

      {

        lock (mutex)

          result += localValue;

      });

  return result;

}

В Parallel LINQ реализована более понятная поддержка агрегирования, чем в классе Parallel:

int ParallelSum(IEnumerable<int> values)

{

  return values.AsParallel().Sum();

}

О'кей, это был дешевый трюк, потому что в PLINQ реализована встроенная поддержка многих распространенных операторов (например, Sum). В PLINQ также предусмотрена обобщенная поддержка агрегирования с оператором Aggregate:

int ParallelSum(IEnumerable<int> values)

{

  return values.AsParallel().Aggregate(

      seed: 0,

      func: (sum, item) => sum + item

  );

}

Пояснение

Если вы уже используете класс Parallel, следует использовать его поддержку агрегирования. В остальных случаях поддержка PLINQ, как правило, более выразительна, а код получается короче.

Дополнительная информация

В рецепте 4.5 изложены основы PLINQ.

4.3. Параллельный вызов

Задача

Имеется набор методов, которые должны вызываться параллельно. Эти методы (в основном) независимы друг от друга.

Решение

Класс Parallel содержит простой метод Invoke, спроектированный для таких сценариев. В следующем примере массив разбивается надвое, и две половины обрабатываются независимо:

void ProcessArray(double[] array)

{

  Parallel.Invoke(

      () => ProcessPartialArray(array, 0, array.Length / 2),

      () => ProcessPartialArray(array, array.Length / 2, array.Length)

  );

}

 

void ProcessPartialArray(double[] array, int begin, int end)

{

  // Обработка, интенсивно использующая процессор...

}

Методу Parallel.Invoke также можно передать массив делегатов, если количество вызовов неизвестно до момента выполнения:

void DoAction20Times(Action action)

{

  Action[] actions = Enumerable.Repeat(action, 20).ToArray();

  Parallel.Invoke(actions);

}

Parallel.Invoke поддерживает отмену, как и другие методы класса Parallel:

void DoAction20Times(Action action, CancellationToken token)

{

  Action[] actions = Enumerable.Repeat(action, 20).ToArray();

  Parallel.Invoke(new ParallelOptions { CancellationToken = token },

    actions);

}

Пояснение

Метод Parallel.Invoke — отличное решение для простого параллельного вызова. Отмечу, что он уже не так хорошо подходит для ситуаций, в которых требуется активизировать действие для каждого элемента входных данных (для этого лучше использовать Parallel.ForEach), или если каждое действие производит некоторый вывод (вместо этого следует использовать Parallel LINQ).

Дополнительная информация

В рецепте 4.1 рассматривается метод Parallel.ForEach, который выполняет действие для каждого элемента данных.

В рецепте 4.5 рассматривается Parallel LINQ.

4.4. Динамический параллелизм

Задача

Требуется реализовать более сложную параллельную ситуацию: структура и количество параллельных задач зависит от информации, которая становится известной только во время выполнения.

Решение

В библиотеке TPL (Task Parallel Library) центральное место занимает тип Task. Класс Parallel и Parallel LINQ — всего лишь удобные обертки для мощного типа Task. Если потребуется реализовать динамический параллелизм, проще использовать тип Task напрямую.

В приведенном ниже примере для каждого узла бинарного дерева необходимо выполнить некоторую затратную обработку. Структура дерева неизвестна до стадии выполнения, поэтому этот сценарий хорошо подойдет для динамического параллелизма. Метод Traverse обрабатывает текущий узел, а затем создает две дочерние задачи, по одной для каждой ветви под узлом (в данном примере предполагается, что родительские узлы должны быть обработаны до перехода к дочерним узлам). Метод ProcessTree начинает обработку, создавая родительскую задачу верхнего уровня и ожидая ее завершения:

void Traverse(Node current)

{

  DoExpensiveActionOnNode(current);

  if (current.Left != null)

  {

    Task.Factory.StartNew(

        () => Traverse(current.Left),

        CancellationToken.None,

        TaskCreationOptions.AttachedToParent,

        TaskScheduler.Default);

  }

  if (current.Right != null)

  {

    Task.Factory.StartNew(

        () => Traverse(current.Right),

        CancellationToken.None,

        TaskCreationOptions.AttachedToParent,

        TaskScheduler.Default);

  }

}

 

void ProcessTree(Node root)

{

  Task task = Task.Factory.StartNew(

      () => Traverse(root),

      CancellationToken.None,

      TaskCreationOptions.None,

      TaskScheduler.Default);

  task.Wait();

}

Флаг AttachedToParent гарантирует, что задача Task для каждой ветви связывается с задачей Task своего родительского узла. Таким образом создаются отношения «родитель/потомок» между экземплярами Task, моделирующими отношения «родитель/потомок» в узлах дерева. Родительские задачи выполняют своего делегата, после чего ожидают завершения своих дочерних задач. Исключения от дочерних задач распространяются от дочерних задач к своей родительской задаче. Таким образом, ProcessTree может ожидать задач для всего дерева, для чего достаточно вызвать Wait для одной задачи Task в корне дерева.

Если ваша ситуация не относится к категории «родитель/потомок», вы можете запланировать запуск любой задачи после другой задачи, используя продолжение. Продолжение (continuation) представляет собой отдельную задачу, которая выполняется после завершения исходной:

Task task = Task.Factory.StartNew(

    () => Thread.Sleep(TimeSpan.FromSeconds(2)),

    CancellationToken.None,

    TaskCreationOptions.None,

    TaskScheduler.Default);

Task continuation = task.ContinueWith(

    t => Trace.WriteLine("Task is done"),

    CancellationToken.None,

    TaskContinuationOptions.None,

    TaskScheduler.Default);

// Аргумент "t" для продолжения - то же, что "task".

Пояснение

CancellationToken.None и TaskScheduler.Default используются в предыдущем примере кода. Маркеры отмены рассматриваются в рецепте 10.2, а планировщики задач — в рецепте 13.3. Всегда лучше явно задать планировщик TaskScheduler, используемый StartNew и ContinueWith.

Такая структура родительских и дочерних задач типична для динамического параллелизма, хотя и не обязательна. С таким же успехом можно сохранить каждую новую задачу в потокобезопасной коллекции, а затем ожидать завершения их всех с использованием Task.WaitAll.

scorp.tiff

Использование Task для параллельной обработки принципиально отличается от использования Task для асинхронной обработки.

Тип Task в параллельном программировании служит двум целям: он может представлять параллельную или асинхронную задачу. Параллельные задачи могут использовать блокирующие методы, такие как Task.Wait, Task.Result, Task.WaitAll и Task.WaitAny. Параллельные задачи также обычно используют AttachedToParent для создания отношений «родитель/потомок» между задачами. Параллельные задачи следует создавать методами Task.Run или Task.Factory.StartNew.

С другой стороны, асинхронным задачам следует избегать блокирующих методов в пользу await, Task.WhenAll и Task.WhenAny. Асинхронные задачи не должны использовать AttachedToParent, но они могут формировать неявные отношения «родитель/потомок», используя ожидание других задач.

Дополнительная информация

В рецепте 4.3 рассматривается параллельный вызов последовательности методов в том случае, если все методы известны на момент начала параллельной работы.

4.5. Parallel LINQ

Задача

Требуется выполнить параллельную обработку последовательности данных, чтобы сгенерировать другую их последовательность или обобщение этих данных.

Решение

Многие разработчики знакомы с технологией LINQ, позволяющей программировать вычисления с последовательностями, работающей по принципу вытягивания. Parallel LINQ (PLINQ) расширяет эту поддержку LINQ параллельной обработкой.

PLINQ хорошо работает в потоковых сценариях, которые получают последовательность входных значений и производят последовательность выходных значений. Следующий простой пример просто умножает каждый элемент последовательности на 2 (в реальных сценариях выполняются гораздо более серьезные вычисления, чем простое умножение):

IEnumerable<int> MultiplyBy2(IEnumerable<int> values)

{

  return values.AsParallel().Select(value => value * 2);

}

Пример может генерировать свои выходные значения в любом порядке; это поведение используется по умолчанию в Parallel LINQ. Также можно потребовать, чтобы сохранялся исходный порядок. В следующем примере обработка ведется параллельно, но с сохранением исходного порядка:

IEnumerable<int> MultiplyBy2(IEnumerable<int> values)

{

  return values.AsParallel().AsOrdered().Select(value => value * 2);

}

Другое логичное применение Parallel LINQ — агрегирование или обобщение данных в параллельном режиме. В следующем примере выполняется параллельное суммирование:

int ParallelSum(IEnumerable<int> values)

{

  return values.AsParallel().Sum();

}

Пояснение

Класс Parallel хорошо подходит для многих сценариев, но код PLINQ получается более простым при агрегировании или преобразовании одной последовательности в другую. Следует помнить, что класс Parallel ведет себя более корректно с другими процессами в системе, чем PLINQ; этот фактор становится особенно существенным при выполнении параллельной обработки на серверной машине.

PLINQ предоставляет параллельные версии многих операторов, включая фильтрацию (Where), проекцию (Select) и разные виды агрегирования, такие как Sum, Average и более общую форму Aggregate. В общем случае все, что можно сделать с обычным LINQ, также можно сделать в параллельном режиме с PLINQ. В результате PLINQ становится отличным кандидатом для переработки существующего кода LINQ, который выиграл бы от выполнения в параллельном режиме.

Дополнительная информация

В рецепте 4.1 рассматривается использование класса Parallel для выполнения кода для каждого элемента в последовательности.

В рецепте 10.5 рассматривается отмена запросов PLINQ.

Назад: Глава 3. Асинхронные потоки
Дальше: Глава 5. Основы Dataflow