Книга: Конкурентность в C#. Асинхронное, параллельное и многопоточное программирование. 2-е межд. изд.
Назад: Глава 4. Основы параллельного программирования
Дальше: Глава 6. Основы System.Reactive

Глава 5. Основы Dataflow

TPL Dataflow — мощная библиотека, позволяющая создать сеть или конвейер, а затем (асинхронно) отправить по ним свои данные. Dataflow использует декларативный стиль программирования; т.е. сначала вы полностью определяете сеть, а затем начинаете обрабатывать данные. Сеть описывает структуру, по которой перемещаются данные. Для этого придется взглянуть на свое приложение под несколько иным углом, но после того как вы сделаете этот шаг, поток данных (dataflow) станет само собой разумеющимся кандидатом для многих сценариев.

Каждая сеть состоит из различных блоков, связанных друг с другом. Отдельные блоки просты, они отвечают только за один этап обработки данных. Когда блок завершает работу над своими данными, он передает свой результат всем связанным блокам.

Чтобы использовать TPL Dataflow, установите в своем приложении NuGet-пакет из System.Threading.Tasks.Dataflow.

5.1. Связывание блоков

Задача

Требуется связать блоки Dataflow для создания сети.

Решение

Блоки, предоставляемые библиотекой TPL Dataflow, определяют только самые базовые составляющие. Многие полезные методы TPL Dataflow в действительности являются методами расширения. Метод расширения LinkTo предоставляет простой механизм связывания блоков потока данных:

var multiplyBlock = new TransformBlock<int, int>(item => item * 2);

var subtractBlock = new TransformBlock<int, int>(item => item - 2);

// После связывания значения, выходящие из multiplyBlock,

// будут входить в subtractBlock.

multiplyBlock.LinkTo(subtractBlock);

По умолчанию связанные блоки только распространяют данные; они не распространяют завершение (или ошибки). Если ваш поток данных линеен (например, в конвейере), то, скорее всего, вы захотите распространять завершение. Чтобы распространять завершение (и ошибки), установите параметр PropagateCompletion для связи:

var multiplyBlock = new TransformBlock<int, int>(item => item * 2);

var subtractBlock = new TransformBlock<int, int>(item => item - 2);

 

var options = new DataflowLinkOptions { PropagateCompletion = true };

multiplyBlock.LinkTo(subtractBlock, options);

 

...

 

// Завершение первого блока автоматически распространяется во второй блок.

multiplyBlock.Complete();

await subtractBlock.Completion;

Пояснение

После связывания данные будут автоматически перемещаться от блока-источника к блоку-приемнику. Параметр PropagateCompletion перемещает не только данные, но и завершение; тем не менее на каждом этапе конвейера сбойный блок будет распространять в следующий блок свое исключение, упакованное в AggregateException. Таким образом, если имеется длинный конвейер, распространяющий завершения, исходная ошибка может быть вложена в несколько экземпляров AggregateException.  AggregateException содержит несколько методов (например, Flatten), упрощающих обработку ошибок в подобных ситуациях.

Блоки потока данных могут связываться друг с другом многими разными способами; сеть может содержать ветвления, соединения и даже циклы. Для большинства сценариев обычно хватает простого линейного конвейера. Мы будем работать в основном с конвейерами (и рассмотрим ветвления); более сложные сценарии выходят за рамки книги.

Тип DataflowLinkOptions предоставляет ряд параметров, которые могут устанавливаться для связей (как, например, параметр PropagateCompletion, использованный в этом решении), а перегруженная версия LinkTo также может получать предикат, использующийся для фильтрации данных, передаваемых через связь. Данные, прошедшие через фильтр, перемещаются по связи; данные, не прошедшие через фильтр, не теряются, а пытаются пройти по альтернативной связи (и остаются в блоке, если другой связи не существует). Если элемент данных «застревает» в блоке, то этот блок не производит других элементов данных; весь блок приостанавливается до извлечения элемента данных.

Дополнительная информация

В рецепте 5.2 рассматривается распространение ошибок по связям.

В рецепте 5.3 рассматриваются связи между блоками.

В рецепте 8.8 рассматривается связывание блоков потока данных с наблюдаемыми потоками System.Reactive.

5.2. Распространение ошибок

Задача

Найти способ реагировать на ошибки, которые могут происходить в сети потока данных.

Решение

Если делегат, переданный блоку потока данных, выдает исключение, то этот блок входит в состояние отказа. Блок в состоянии отказа теряет все свои данные (и перестает принимать новые). В следующем коде блок не производит никаких выходных данных; первое значение выдает исключение, а второе просто теряется:

var block = new TransformBlock<int, int>(item =>

{

  if (item == 1)

    throw new InvalidOperationException("Blech.");

  return item * 2;

});

block.Post(1);

block.Post(2);

Чтобы перехватывать исключения от блока потока данных, необходимо ожидать его свойства Completion. Свойство Completion возвращает объект Task, который завершается при завершении блока, а если в блоке происходит отказ, то и в задаче Completion тоже происходит отказ:

try

{

  var block = new TransformBlock<int, int>(item =>

  {

    if (item == 1)

      throw new InvalidOperationException("Blech.");

    return item * 2;

  });

  block.Post(1);

  await block.Completion;

}

catch (InvalidOperationException)

{

  // Здесь перехватывается исключение.

}

Когда вы распространяете завершение с помощью параметра связи PropagateCompletion, ошибки тоже распространяются. Однако исключение передается следующему блоку, упакованному в AggregateException. Следующий пример перехватывает исключение в конце конвейера, поэтому он перехватит AggregateException, если исключение было распространено из предыдущих блоков:

try

{

  var multiplyBlock = new TransformBlock<int, int>(item =>

  {

    if (item == 1)

      throw new InvalidOperationException("Blech.");

    return item * 2;

  });

  var subtractBlock = new TransformBlock<int, int>(item => item - 2);

  multiplyBlock.LinkTo(subtractBlock,

      new DataflowLinkOptions { PropagateCompletion = true });

  multiplyBlock.Post(1);

  await subtractBlock.Completion;

}

catch (AggregateException)

{

  // Здесь перехватывается исключение.

}

Каждый блок упаковывает входящие ошибки в AggregateException, даже если входящая ошибка уже представляет собой AggregateException. Если ошибка происходит на ранней стадии конвейера и успевает переместиться на несколько связей перед тем, как будет обнаружена, исходная ошибка будет упакована в AggregateException на нескольких уровнях. Метод AggregateException.Flatten упрощает обработку ошибок в этом сценарии.

Пояснение

Занимаясь построением сети (или конвейера), подумайте над тем, как будут обрабатываться ошибки. В более простых ситуациях может быть лучше распространить ошибки и перехватывать их все сразу в конце. В более сложных сетях вам, возможно, придется проверить каждый блок при завершении потока данных.

Также возможен другой вариант: если вы хотите, чтобы блоки сохраняли работоспособность перед лицом исключений, можно рассматривать исключения как другую разновидность данных и дать им проходить через сеть с правильно обрабатываемыми элементами данных. Использование этого паттерна позволит сохранить работоспособность сети потока данных, так как сами блоки не будут переходить в состояние отказа и продолжат обработку следующего элемента данных. За дополнительной информацией обращайтесь к рецепту 14.6.

Дополнительная информация

В рецепте 5.1 рассматриваются связи между блоками.

В рецепте 5.3 рассматривается разрыв связей между блоками.

В рецепте 14.6 рассматривается перемещение исключений вместе с данными в сети потока данных.

5.3. Удаление связей между блоками

Задача

В процессе обработки необходимо динамически изменить структуру потока данных. Это нетипичный сценарий, с которым вы вряд ли когда-либо столкнетесь в жизни.

Решение

Связи между блоками потока данных можно создавать или удалять в любой момент; данные могут свободно проходить по сети, и это не помешает безопасно создавать или удалять связи. Как создание, так и удаление связей являются полностью потокобезопасными.

При создании связи между блоками потока данных сохраните объект IDisposable, возвращенный методом LinkTo, и уничтожьте его, когда потребуется разорвать связь между блоками:

var multiplyBlock = new TransformBlock<int, int>(item => item * 2);

var subtractBlock = new TransformBlock<int, int>(item => item - 2);

IDisposable link = multiplyBlock.LinkTo(subtractBlock);

multiplyBlock.Post(1);

multiplyBlock.Post(2);

// Удаление связей между блоками.

// Данные, отправленные выше, могут быть уже переданы

// или не переданы по связи. В реальном коде стоит рассмотреть

// возможность блока using вместо простого вызова Dispose.

link.Dispose();

Пояснение

Если нет гарантии, что связь бездействует, при ее удалении может возникнуть состояние гонки (race). Но состояние гонки обычно не создает проблем; данные либо проходят по связи, перед тем как она будет уничтожена, либо не проходят. Здесь нет условий гонки, которые могли бы привести к дублированию или потере данных.

Сценарий с разрывом связи нетипичен, но и он может пригодиться в некоторых ситуациях. Например, невозможно изменить фильтр для связи — придется удалить старую связь и создать новую с новым фильтром (возможно, с присваиванием DataflowLinkOptions.Append значения false). Также удаление связи в стратегической точке может использоваться для приостановки сети потока данных.

Дополнительная информация

В рецепте 5.1 рассматривается создание связей между блоками.

5.4. Регулирование блоков

Задача

Имеется сеть потока данных с ветвлением. Требуется организовать передачу данных с распределением нагрузки.

Решение

По умолчанию блок, генерирующий выходные данные, проверяет все свои связи (в порядке их создания) и пытается последовательно передавать данные по каждому каналу. Кроме того, по умолчанию каждый блок поддерживает входной буфер и принимает произвольное количество данных до того, как он будет готов их обработать.

Это создает проблему в сценарии с ветвлением, в котором один блок-источник связан с двумя блоками-приемниками: в этом случае второй блок будет простаивать. Первый блок-приемник всегда будет принимать данные и буферизовать их, так что блок-источник никогда не будет пытаться передавать данные второму блоку-приемнику. Проблему можно решить регулировкой (throttling) блоков-приемников с использованием параметра блока BoundedCapacity. По умолчанию BoundedCapacity присваивается значение DataflowBlockOptions.Unbounded, при котором первый блок-приемник буферизует все данные, даже если еще не готов к их обработке.

BoundedCapacity можно присвоить любое значение больше нуля (или, конечно, DataflowBlockOptions.Unbounded). Если блоки-приемники успевают обрабатывать данные, поступающие от блоков-источников, простого значения 1 будет достаточно:

var sourceBlock = new BufferBlock<int>();

var options = new DataflowBlockOptions { BoundedCapacity = 1 };

var targetBlockA = new BufferBlock<int>(options);

var targetBlockB = new BufferBlock<int>(options);

 

sourceBlock.LinkTo(targetBlockA);

sourceBlock.LinkTo(targetBlockB);

Пояснение

Регулировка полезна для распределения нагрузки в конфигурациях с ветвлением, но она также может применяться везде, где возникнет необходимость в регулировании. Например, если сеть потока данных заполняется данными от операции ввода/вывода, можно применить BoundedCapacity к блокам своей сети. В этом случае вы не прочитаете слишком много данных ввода/вывода до того, как сеть будет к этому готова, а все входные данные не будут буферизованы сетью до того, как она сможет его обработать.

Дополнительная информация

В рецепте 5.1 рассматривается связывание блоков.

5.5. Параллельная обработка с блоками потока данных

Задача

Требуется выполнить параллельную обработку в сети потока данных.

Решение

По умолчанию каждый блок потока данных не зависит от других блоков. Когда вы связываете два блока, они будут выполнять обработку независимо друг от друга. Таким образом, в каждую сеть потока данных встроена некоторая степень естественного параллелизма.

Если требуется выйти за эти рамки, — например, если один конкретный блок выполняет интенсивные вычисления на процессоре, — вы можете дать команду этому блоку работать параллельно с входными данными, устанавливая параметр MaxDegreeOfParallelism. По умолчанию этому параметру тоже присваивается значение 1, поэтому каждый блок потока данных обрабатывает только один фрагмент данных за раз.

BoundedCapacity можно присвоить DataflowBlockOptions.Unbounded или любoе значение, большее 0. Следующий пример позволяет любому количеству задач умножать данные одновременно:

var multiplyBlock = new TransformBlock<int, int>(

    item => item * 2,

    new ExecutionDataflowBlockOptions

    {

      MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded

    });

var subtractBlock = new TransformBlock<int, int>(item => item - 2);

multiplyBlock.LinkTo(subtractBlock);

Пояснение

Параметр MaxDegreeOfParallelism упрощает организацию параллельной обработки в блоке. Сложнее определить, каким блокам это потребуется. Один из способов заключается в том, чтобы приостановить выполнение потока данных в отладчике и проанализировать количество элементов данных в очереди (т. е. элементов, которые еще не были обработаны блоком). Неожиданно высокое количество элементов данных может указывать на то, что реорганизация или параллелизация могли бы принести пользу.

MaxDegreeOfParallelism также работает и в том случае, если блок потока данных выполняет асинхронную обработку. В этом случае параметр MaxDegreeOfParallelism задает уровень параллелизма — определенное количество слотов. Каждый элемент данных занимает слот, когда блок приступает к его обработке, и покидает этот слот только при полном завершении асинхронной обработки.

Дополнительная информация

В рецепте 5.1 рассматривается связывание блоков.

5.6. Создание собственных блоков

Задача

Имеется некоторая логика, которую требуется разместить в нестандартном блоке потока данных. Это позволит создавать большие блоки, содержащие сложную логику.

Решение

Можно выделить любую часть сети потока данных, содержащую один входной и один выходной блок, с помощью метода Encapsulate. Encapsulate формирует блок по двум конечным точкам. Распространение данных и завершение между этими конечными точками остаются на вашей ответственности. Следующий код создает из двух блоков нестандартный блок потока данных с распространением данных и завершения:

IPropagatorBlock<int, int> CreateMyCustomBlock()

{

  var multiplyBlock = new TransformBlock<int, int>(item => item * 2);

  var addBlock = new TransformBlock<int, int>(item => item + 2);

  var divideBlock = new TransformBlock<int, int>(item => item / 2);

 

  var flowCompletion = new DataflowLinkOptions { PropagateCompletion = true };

  multiplyBlock.LinkTo(addBlock, flowCompletion);

  addBlock.LinkTo(divideBlock, flowCompletion);

 

  return DataflowBlock.Encapsulate(multiplyBlock, divideBlock);

}

Пояснение

Инкапсулируя сеть в нестандартный блок, подумайте о том, какие параметры вы хотите предоставить пользователям. Подумайте, как каждый параметр блока должен (или не должен) передаваться вашей внутренней сети; во многих случаях некоторые параметры блоков неприменимы или не имеют смысла. По этой причине нестандартные блоки обычно определяют собственные нестандартные параметры вместо того, чтобы получать параметр DataflowBlockOptions.

DataflowBlock.Encapsulate инкапсулирует только сеть с одним входным и одним выходным блоками. Если у вас имеется сеть с несколькими входными и/или выходными блоками, предназначенная для повторного использования, вам следует инкапсулировать ее в специальном объекте и предоставить доступ к входам и выходам как к свойствам типа ITargetBlock<T> (для входов) и IReceivableSourceBlock<T> (для выходов).

Все эти примеры используют Encapsulate для создания нестандартного блока. Также возможно реализовать интерфейсы потока данных самостоятельно, но это намного сложнее. Компания Microsoft опубликовала статью (/) с описанием нетривиальных приемов создания нестандартных блоков потока данных.

Дополнительная информация

В рецепте 5.1 рассматривается связывание блоков.

В рецепте 5.2 рассматривается распространение ошибок по связям между каналами.

Назад: Глава 4. Основы параллельного программирования
Дальше: Глава 6. Основы System.Reactive