Конкурентность (concurrency) является ключевым аспектом построения красивых программ. В течение десятилетий конкурентность была возможна, но реализовывалась с изрядными трудностями. Конкурентные программы создавали трудности с написанием, отладкой и сопровождением. В итоге многие разработчики выбирали более простой путь и избегали конкурентности. Благодаря библиотекам и языковым средствам, доступным для современных программ .NET, в наши дни конкурентность достигается гораздо проще. Компания Microsoft стала лидером движения к существенному понижению планки сложности конкурентности. Когда-то конкурентное программирование было уделом экспертов; но в наши дни каждый разработчик может (и должен) владеть средствами конкурентности.
Прежде чем продолжать, стоит разобраться с терминами, которые будут использоваться в книге. Это мои собственные определения, которые я постоянно использую для того, чтобы различать разные методы программирования. Начнем с конкурентности.
Конкурентность
Выполнение сразу нескольких действий в одно и то же время.
Надеюсь, полезность конкурентности сомнений не вызывает. Приложения для конечного пользователя используют конкурентность, чтобы реагировать на ввод данных пользователем во время записи в базу данных. Серверные приложения используют конкурентность для реакции на второй запрос в ходе завершения первого запроса. Конкурентность пригодится в любой ситуации, когда приложение должно делать что-то одно во время работы над чем-то другим. Практически любое программное приложение может выиграть от применения конкурентности.
Многие разработчики, слыша термин «конкурентность», немедленно думают о «многопоточности». Тем не менее эти два понятия следует различать.
Многопоточность
Форма конкурентности, использующая несколько программных потоков выполнения.
Многопоточность относится к буквальному использованию нескольких потоков. Как было продемонстрировано во многих рецептах книги, многопоточность является разновидностью конкурентности, но, безусловно, это не единственная форма. Непосредственное использование низкоуровневых видов многопоточности в современных приложениях практически не имеет смысла; высокоуровневые абстракции превосходят многопоточные средства старой школы как по мощи, так и по эффективности. По этой причине рассмотрение устаревших средств будет сведено к минимуму. Ни в одном из многопоточных рецептов этой книги не используются типы Thread или BackgroundWorker; они были заменены более качественными альтернативами.
Как только вы вводите команду new Thread(), все кончено: ваш проект уже содержит устаревший код.
Только не подумайте, что многопоточность мертва! Многопоточность продолжает жить в пулах потоков — полезном месте для постановки рабочих операций в очередь, которое автоматически регулируется в зависимости от нагрузки. В свою очередь, с пулом потоков становится возможной одна важная форма конкурентности: параллельная обработка.
Параллельная обработка
Выполнение большого объема работы за счет распределения ее между несколькими потоками, выполняемыми одновременно.
Параллельная обработка (или параллельное программирование) использует многопоточность для максимально эффективного использования многоядерных процессоров. Современные процессоры часто имеют несколько ядер, и при большом объеме выполняемой работы было бы неразумно поручать всю работу одному ядру, в то время как остальные простаивают. Параллельная обработка распределяет работу между несколькими потоками, каждый из которых может выполняться независимо на отдельном ядре.
Параллельная обработка является одной из разновидностей многопоточности, а многопоточность является одной из разновидностей конкурентности. Также существует другая разновидность конкурентности, которая важна в современных приложениях, но не так хорошо знакома многим разработчикам: асинхронное программирование.
Асинхронное программирование
Разновидность конкурентности, использующая обещания или обратные вызовы для предотвращения создания лишних потоков.
Обещание (future/promise), или преднамеченный тип — тип, представляющий некоторую операцию, которая завершится в будущем. Примеры современных типов обещаний в .NET — Task и Task<TResult>. Более старые асинхронные API используют обратные вызовы или события вместо обещаний. В асинхронном программировании центральное место занимает идея асинхронной операции — некоторой запущенной операции, которая завершится через некоторое время. Хотя операция продолжается, она не блокирует исходный поток; поток, который запустил операцию, свободен для выполнения другой работы. Когда операция завершится, она уведомляет свое обещание или активизирует обратный вызов или событие, чтобы приложение узнало о завершении.
Асинхронное программирование — мощная разновидность конкурентности, оно до недавнего времени требовало чрезвычайно сложного кода. Благодаря поддержке async и await в современных языках асинхронное программирование становится почти таким же простым, как и синхронное (неконкурентности) программирование.
Другая форма конкурентности — реактивное программирование (reactive programming). Асинхронное программирование подразумевает, что приложение запускает операцию, которая завершится в будущем. Реактивное программирование тесно связано с асинхронным программированием, но в его основе лежат асинхронные события вместо асинхронных операций. Асинхронные события могут не иметь фактического «начала», могут происходить в любое время и могут инициироваться многократно. Один из примеров такого рода — ввод данных пользователем.
Реактивное программирование
Декларативный стиль программирования, при котором приложение реагирует на события.
Если рассматривать приложение как огромный конечный автомат, поведение приложения может быть описано как реакция на серию событий с обновлением своего состояния на каждое событие. Это не настолько абстрактное или теоретическое определение, как может показаться: с современными фреймворками этот метод весьма полезен в реальных приложениях. Реактивное программирование не обязательно конкурентно, но оно тесно связано с конкурентностью, поэтому в книге будут изложены его основы.
Обычно при написании конкурентной программы применяется комбинация разных методов. В большинстве приложений используется как минимум многопоточность (через пул потоков) и асинхронное программирование. Вы можете свободно смешивать разные формы конкурентности, используя подходящий инструмент для каждой части приложения.
Асинхронное программирование обладает двумя главными преимуществами. Первое характерно для программ с графическим интерфейсом (GUI), предназначенных для пользователя: асинхронное программирование обеспечивает быстрый отклик. Каждому из нас попадались программы, которые вдруг зависают во время работы; асинхронная программа сможет быстро реагировать на действия пользователя во время работы. Второе преимущество характерно для программ, работающих на стороне сервера: асинхронное программирование обеспечивает масштабируемость. Серверное приложение может в некоторой степени масштабироваться за счет использования пула потоков, но асинхронное серверное приложение обычно обладает на порядок лучшими возможностями масштабирования.
Оба преимущества асинхронного программирования обусловлены одним и тем же аспектом: асинхронное программирование освобождает потоки. Для GUI-программ асинхронное программирование освобождает UI-поток; это позволяет графическому приложению сохранить высокую скорость отклика на ввод пользователя. Для серверных приложений асинхронное программирование освобождает потоки запросов и позволяет серверу использовать свои потоки для обслуживания большего количества запросов.
В современных асинхронных приложениях .NET используются два ключевых слова: async и await. Ключевое слово async добавляется в объявление метода и имеет двойное назначение: оно разрешает использование ключевого слова await внутри этого метода и приказывает компилятору сгенерировать для этого метода конечный автомат по аналогии с тем, как работает yield return. Метод с ключевым словом async может вернуть Task<TResult>, если он возвращает значение; Task — если он не возвращает значения; или любой другой «сходный» тип — такой, как ValueTask. Кроме того, async-метод может вернуть IAsyncEnumerable<T> или IAsyncEnumerator<T>, если он возвращает несколько значений в перечислении. «Сходные» типы представляют обещания; они могут уведомлять вызывающий код о завершении async-метода.
Избегайте async void! Возможно создать async-метод, который возвращает void, но это следует делать только при написании async-обработчика событий. Обычный async-метод без возвращаемого значения должен возвращать Task, а не void.
С учетом всего сказанного рассмотрим короткий пример:
async Task DoSomethingAsync()
{
int value = 13;
// Асинхронно ожидать 1 секунду.
await Task.Delay(TimeSpan.FromSeconds(1));
value *= 2;
// Асинхронно ожидать 1 секунду.
await Task.Delay(TimeSpan.FromSeconds(1));
Trace.WriteLine(value);
}
async-метод начинает выполняться синхронно, как и любой другой метод. Внутри async-метода команда await выполняет асинхронное ожидание по своему аргументу. Сначала она проверяет, завершилась ли операция: если да, то метод продолжает выполняться (синхронно). В противном случае await приостанавливает async-метод и возвращает незавершенную задачу. Когда операция завершится позднее, метод async продолжает выполнение.
async-метод может рассматриваться как состоящий из нескольких синхронных частей, разделенных командами await. Первая синхронная часть выполняется в потоке, который вызвал метод, но где выполняются другие синхронные части? Ответ на этот вопрос не прост.
При выполнении await для задачи (самый распространенный сценарий) в момент, когда await решает приостановить метод, сохраняется контекст. Это текущий объект SynchronizationContext, если только он не равен null (в этом случае контекстом является текущий объект TaskScheduler). Метод возобновляет выполнение в этом сохраненном контексте. Обычно контекстом является UI-контекст (для UI-потока) или контекст пула потоков (в большинстве других ситуаций). Если вы пишете приложение ASP.NET Classic (до Core), то контекстом также может быть контекст запроса ASP.NET. В ASP.NET Core используется контекст пула потоков вместо специального контекста запроса.
Таким образом, в приведенном коде все синхронные части пытаются возобновить продолжение в исходном контексте. Если вызвать метод DoSomethingAsync из UI-потока, каждая из его синхронных частей будет выполняться в этом UI-потоке, но если вызвать его из потока из пула потоков, то каждая из синхронных частей будет выполняться в любом потоке из пула потоков.
Чтобы обойти это поведение по умолчанию, можно выполнить await по результату метода расширения ConfigureAwait с передачей false в параметре continueOnCapturedContext. Следующий код начинает выполнение в вызывающем потоке, а после приостановки await он возобновляет выполнение в потоке из пула потоков:
async Task DoSomethingAsync()
{
int value = 13;
// Асинхронно ожидать 1 секунду.
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
value *= 2;
// Асинхронно ожидать 1 секунду.
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
Trace.WriteLine(value);
}
Хорошей практикой программирования считается вызывать ConfigureAwait в базовых «библиотечных» методах и возобновлять контекст только тогда, когда потребуется — в ваших внешних методах «пользовательского интерфейса».
Ключевое слово await не ограничивается работой с задачами, оно может работать с любым объектом, допускающим ожидание (awaitable), построенным по определенной схеме. Например, библиотека Base Class Library включает тип ValueTask<T>, который сокращает затраты памяти, если результат в основном является синхронным; например, если результат может быть прочитан из кэша в памяти. Тип ValueTask<T> не преобразуется в Task<T> напрямую, но строится по схеме, допускающей ожидание, поэтому может использоваться с await. Также существуют другие примеры, и вы можете строить свои собственные, но в большинстве случаев await получает Task или Task<TResult>.
Существует два основных способа создания экземпляров Task. Некоторые задачи представляют реальный код, который должен выполняться процессором; такие вычислительные задачи должны создаваться вызовом Task.Run (или TaskFactory.StartNew, если они должны выполняться по определенному расписанию). Другие задачи представляют уведомления; такие задачи, основанные на событиях, создаются TaskCompletionSource<TResult> (или одной из сокращенных форм). Большинство задач ввода/вывода использует TaskCompletionSource<TResult>.
Обработка ошибок с async и await выглядит логично. В следующем фрагменте кода PossibleExceptionAsync может выдать исключение NotSupportedException, но TrySomethingAsync может перехватить исключение естественным образом. Трассировка стека перехваченного исключения сохраняется без искусственной упаковки в TargetInvocationException или AggregateException:
async Task TrySomethingAsync()
{
try
{
await PossibleExceptionAsync();
}
catch (NotSupportedException ex)
{
LogException(ex);
throw;
}
}
Когда async-метод выдает (или распространяет) исключение, оно помещается в возвращаемый объект Task, и задача Task завершается. При выполнении await для этого объекта Task оператор await получает это исключение и (заново) выдает его так, что исходная трассировка стека сохраняется. Такой код, как в примере ниже, будет работать так, как ожидается, если PossibleExceptionAsync является async-методом:
async Task TrySomethingAsync()
{
// Исключение попадает в Task, а не выдается напрямую.
Task task = PossibleExceptionAsync();
try
{
// Исключение из Task exception будет выдано здесь, в точке await.
await task;
}
catch (NotSupportedException ex)
{
LogException(ex);
throw;
}
}
Относительно async-методов существует одна важная рекомендация: при использовании ключевого слова async лучше позволить ему распространяться в вашем коде. Если вы вызываете async-метод, следует (в конечном итоге) выполнить await для возвращаемой им задачи. Боритесь с искушением вызвать Task.Wait, Task<TResult>.Result или GetAwaiter().GetResult(): это приведет к взаимоблокировке (deadlock). Рассмотрим следующий метод:
async Task WaitAsync()
{
// await сохранит текущий контекст ...
await Task.Delay(TimeSpan.FromSeconds(1));
// ... и попытается возобновить метод в этой точке с этим контекстом.
}
void Deadlock()
{
// Начать задержку.
Task task = WaitAsync();
// Синхронное блокирование с ожиданием завершения async-метода.
task.Wait();
}
Код в этом примере создаст взаимоблокировку при вызове из UI-контекста или контекста ASP.NET Classic, потому что оба эти контекста допускают выполнение только одного потока. Deadlock вызовет WaitAsync, что приводит к началу задержки. Затем Deadlock (синхронно) ожидает завершения этого метода с блокированием контекстного потока. Когда задержка завершится, await пытается возобновить WaitAsync в сохраненном контексте, но не сможет, так как в контексте уже есть заблокированный поток, а контекст допускает только один поток в любой момент времени. Взаимоблокировку можно предотвратить двумя способами: использовать ConfigureAwait(false) в WaitAsync (что заставляет await игнорировать его контекст) или же использовать await с вызовом WaitAsync (что превращает Deadlock в async-метод).
Используйте async по полной программе.
Если вы хотите более подробно изучить async, компания Microsoft предоставляет великолепную документацию по этой теме. Рекомендую прочитать по крайней мере обзор «Asynchronous Programming» и «Task-based Asynchronous Pattern (TAP)». Кроме этого, также имеется документация «Async in Depth».
Асинхронные потоки берут основу async и await и расширяют ее для работы с множественными значениями. Асинхронные потоки строятся на основе концепции асинхронных перечисляемых объектов, которые похожи на обычные перечисляемые объекты (enumerables), за исключением того, что позволяют выполнить асинхронную работу при получении следующего элемента последовательности. Это исключительно мощная концепция, которая более подробно рассматривается в главе 3. Асинхронные потоки особенно полезны тогда, когда имеется последовательность данных, поступающих либо поодиночке, либо блоками. Например, если приложение обрабатывает ответ API, в котором используется разбиение на страницы с параметрами limit и offset, асинхронные потоки могут стать идеальной абстракцией. На момент написания книги асинхронные потоки были доступны только на новейших платформах .NET.
Параллельное программирование следует использовать в любой ситуации, в которой серьезный объем вычислительной работы может быть разделен на независимые блоки. Параллельное программирование временно повышает загрузку процессора для улучшения пропускной способности системы; это может быть полезно в клиентских системах, в которых процессор часто простаивает, но в серверных системах обычно неуместно. У большинства серверов присутствуют некоторые встроенные средства параллелизма; например, ASP.NET обрабатывает несколько запросов параллельно. Написание параллельного кода на сервере может приносить пользу в некоторых ситуациях (если вам известно, что количество одновременно обслуживаемых пользователей всегда будет низким), но, как правило, параллельное программирование на сервере будет конфликтовать со встроенными параллельными средствами и не принесет никакой реальной пользы.
Есть две формы параллельного программирования: параллелизм данных и параллелизм задач. Параллелизм данных возникает тогда, когда имеется набор элементов данных, ожидающих обработки, и обработка каждого фрагмента данных в основном не зависит от других фрагментов. Под параллелизмом задач понимается такая ситуация, в которой имеется некоторый пул работы, где каждый фрагмент работы в основном не зависит от остальных. Параллелизм задач может быть динамическим — если один фрагмент работы порождает несколько дополнительных фрагментов работы, они могут быть добавлены в пул работы.
Известно несколько разных подходов к реализации параллелизма данных. Метод Parallel.ForEach является аналогом цикла foreach и должен использоваться там, где это возможно. Parallel.ForEach рассматривается в рецепте 4.1. Класс Parallel также поддерживает Parallel.For — аналог цикла for и может использоваться, если обработка данных зависит от индекса. Код, использующий Parallel.ForEach, выглядит примерно так:
void RotateMatrices(IEnumerable<Matrix> matrices, float degrees)
{
Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees));
}
Другой вариант — PLINQ (Parallel LINQ), предоставляющий метод расширения AsParallel для запросов LINQ. Parallel более эффективно расходует ресурсы, чем PLINQ; Parallel лучше сосуществует с другими процессами в системе, тогда как PLINQ (по умолчанию) будет пытаться распространиться по всем процессорам. К недостаткам Parallel следует отнести то, что он требует более явной реализации; PLINQ во многих случаях позволяет писать более элегантный код. PLINQ рассматривается в рецепте 4.5 и выглядит примерно так:
IEnumerable<bool> PrimalityTest(IEnumerable<int> values)
{
return values.AsParallel().Select(value => IsPrime(value));
}
Что бы вы ни выбрали, есть одна рекомендация, которая справедлива при выполнении параллельной обработки.
Блоки работы должны быть независимы друг от друга настолько, насколько это возможно.
Независимость блока работы от всех остальных блоков обеспечивает максимизацию параллелизма. Как только вы начнете использовать совместный доступ к состоянию в разных потоках, придется синхронизировать доступ к общему состоянию, и ваше приложение становится менее параллельным. Синхронизация более подробно рассматривается в главе 12.
Результаты параллельной обработки могут обрабатываться разными способами. Выход можно поместить в некоторую разновидность конкурентной коллекции или же провести агрегирование результатов для получения сводного показателя. Агрегирование часто применяется в параллельной обработке; такая разновидность функциональности «отображение/свертка» также поддерживается перегруженными версиями методов класса Parallel. Агрегирование более подробно рассматривается в рецепте 4.2.
Давайте рассмотрим параллелизм задач. Параллелизм данных ориентирован на обработку данных, а параллелизм задач — на выполнение работы. На высоком уровне между параллелизмом данных и параллелизмом задач есть много общего; «обработка данных» может рассматриваться как разновидность «работы». Многие задачи параллелизма могут решаться любым из этих способов; используйте тот API, который покажется вам более естественным для текущей задачи.
Parallel.Invoke — одна из разновидностей метода Parallel, которая реализует разновидность параллелизма задач типа «ветвление/объединение». Этот метод рассматривается в рецепте 4.3; вы просто передаете делегатов, которые должны выполняться параллельно:
void ProcessArray(double[] array)
{
Parallel.Invoke(
() => ProcessPartialArray(array, 0, array.Length / 2),
() => ProcessPartialArray(array, array.Length / 2, array.Length)
);
}
void ProcessPartialArray(double[] array, int begin, int end)
{
// Действия, интенсивно использующие процессор...
}
Тип Task изначально был разработан для параллелизма задач, хотя он также использовался для асинхронного программирования. Экземпляр Task — в том виде, в котором используется в параллелизме задач, — представляет некоторую работу. Метод Wait может использоваться для ожидания завершения задачи, а свойства Result и Exception — для получения результатов этой работы. Код, использующий Task напрямую, сложнее кода, в котором используется Parallel, но и он может быть полезным, если структура параллелизма неизвестна до стадии выполнения. С этой разновидностью динамического параллелизма количество необходимых фрагментов работы неизвестно до начала обработки; это выясняется во время выполнения. В общем случае динамический фрагмент работы должен запускать все дочерние задачи, необходимые ему, а затем ожидать их завершения. У типа Task имеется специальный флаг TaskCreationOptions.AttachedToParent, который может использоваться для этой цели. Динамический параллелизм рассматривается в рецепте 4.4.
Параллелизм задач должен стремиться к независимости составляющих, как и параллелизм данных. Чем более независимы ваши делегаты, тем эффективнее программа. Кроме того, если делегаты зависимы друг от друга, их придется синхронизировать, а синхронизация усложняет написание правильного кода. При параллелизме задач следует особенно внимательно следить за переменными, сохраненными в замыканиях (closures). Помните, что в замыканиях сохраняются ссылки (а не значения), и это может привести к неочевидным ситуациям с совместным использованием данных.
Обработка ошибок при всех типах параллелизма организуется аналогично. Так как операции выполняются параллельно, в программе могут возникнуть множественные исключения, поэтому они упаковываются в исключение AggregateException, запускаемое в ваш код. Это поведение последовательно реализуется для Parallel.ForEach, Parallel.Invoke, Task.Wait и т.д. Тип AggregateException содержит полезные методы Flatten и Handle, упрощающие код обработки ошибок:
try
{
Parallel.Invoke(() => { throw new Exception(); },
() => { throw new Exception(); });
}
catch (AggregateException ex)
{
ex.Handle(exception =>
{
Trace.WriteLine(exception);
return true; // "обработано"
});
}
Обычно не приходится беспокоиться о том, как пул потоков организует выполнение работы. Параллелизм данных и задач используют динамически регулируемые распределители (partitioners) для распределения работы между рабочими потоками. Пул потоков увеличивает количество потоков по мере необходимости. Он имеет одну рабочую очередь, и каждый поток из пула потоков использует собственную рабочую очередь. Когда поток из пула ставит в очередь дополнительную работу, то сначала отправляет ее в свою очередь, так как работа обычно связывается с текущим рабочим элементом (work item); такое поведение заставляет потоки заниматься своей собственной частью работы и максимизирует процент попаданий в кэш. Если у другого потока нет работы, он забирает работу из очереди другого потока. Компания Microsoft потратила много сил на то, чтобы пул потоков по возможности работал эффективно; существует множество настроек, которые можно изменять для достижения максимального быстродействия. Если ваши задачи не слишком малы, они должны хорошо работать с настройками по умолчанию.
Задачи должны быть ни слишком короткими, ни слишком длинными.
Если задачи получаются слишком короткими, то затраты ресурсов на разбиение данных на задачи и планирование этих задач в пуле потоков начинают играть значительную роль. Если задачи слишком длинные, то пул потоков не может динамически регулировать равномерное распределение работы. Трудно заранее определить, какую задачу следует считать «слишком короткой» или «слишком длинной»; это зависит от решаемой задачи и приблизительных возможностей оборудования. Как правило, я стараюсь делать свои задачи как можно более короткими без создания проблем быстродействия (если быстродействие внезапно падает, значит задачи слишком короткие). Еще лучше не работать с задачами напрямую, а воспользоваться типом Parallel или PLINQ. Эти высокоуровневые формы параллелизма содержат встроенные средства распределения работы, которые решают эту задачу за вас (и вносят необходимые корректировки во время выполнения).
Если вы хотите глубже изучить тему параллельного программирования, то лучшая книга по этой теме — «Parallel Programming with Microsoft .NET» Колина Кэмпбелла и др. (Colin Campbell et al., Microsoft Press).
Изучение реактивного программирования занимает больше времени, чем другие формы конкурентности, а сопровождение кода создает больше проблем, если только вы не эксперт в области реактивного программирования. Но если вы не пожалеете времени и сил, реактивное программирование открывает исключительно мощные возможности. Реактивное программирование позволяет рассматривать поток событий как поток данных. Как правило, если событию передаются какие-либо аргументы, то в коде лучше использовать System.Reactive вместо обычного обработчика событий.
Ранее пакет System.Reactive назывался Reactive Extensions; это название часто сокращалось до «Rx.» Все три термина относятся к одной технологии.
Реактивное программирование основано на концепции наблюдаемых потоков (observable streams). Подписавшись на наблюдаемый поток, вы будете получать любое количество элементов данных (OnNext); поток может завершиться одной ошибкой (OnError) или уведомлением «конец потока» (OnCompleted). Некоторые наблюдаемые потоки никогда не завершаются. Реальные интерфейсы выглядят так:
interface IObserver<in T>
{
void OnNext(T item);
void OnCompleted();
void OnError(Exception error);
}
interface IObservable<out T>
{
IDisposable Subscribe(IObserver<TResult> observer);
}
Однако вам никогда не придется реализовать эти интерфейсы. Библиотека System.Reactive (Rx) компании Microsoft содержит все реализации, которые могут понадобиться. Код Reactive в конечном итоге очень похож на LINQ; его можно рассматривать как своего рода «LINQ to Events». System.Reactive содержит все возможности LINQ, а также добавляет большое количество собственных операторов — особенно предназначенных для работы со временем. Следующий код начинается с незнакомых операторов (Interval и Timestamp) и завершается Subscribe, но в середине находятся операторы Where и Select, которые должны быть знакомы вам по LINQ:
Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Where(x => x.Value % 2 == 0)
.Select(x => x.Timestamp)
.Subscribe(x => Trace.WriteLine(x));
Пример кода начинается с запуска счетчика по периодическому таймеру (Interval) и добавления временной метки для каждого события (Timestamp). Затем события фильтруются так, чтобы включались только четные значения счетчика (Where), выбираются значения временной метки (Timestamp), после чего каждое поступившее значение временной метки записывается в отладчик (Subscribe). Не беспокойтесь, если новые операторы (например, Interval) покажутся непонятными; мы рассмотрим их позже. Пока просто помните, что это запрос LINQ, очень похожий на уже знакомые вам. Главное отличие заключается в том, что LINQ to Objects и LINQ to Entities используют модель вытягивания (pull model), при которой перечисление запроса LINQ «вытягивает» данные из запроса, тогда как LINQ to Events (System.Reactive) использует модель проталкивания (push model), при которой события поступают и перемещаются по запросу сами по себе.
Определение наблюдаемого потока не зависит от его подписок. Последний пример эквивалентен следующему коду:
IObservable<DateTimeOffset> timestamps =
Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Where(x => x.Value % 2 == 0)
.Select(x => x.Timestamp);
timestamps.Subscribe(x => Trace.WriteLine(x));
Для типа нормально определять наблюдаемые потоки и делать их доступными в виде ресурса IObservable<TResult>. Затем другие типы могут подписываться на эти потоки или объединять их с другими операторами для создания другого наблюдаемого потока.
Подписка System.Reactive также является ресурсом. Операторы Subscribe возвращают реализацию IDisposable, которая представляет подписку. Когда ваш код завершит прослушивание наблюдаемого потока, он должен прекратить свою подписку.
Подписки ведут себя по-разному с холодными и горячими наблюдаемыми объектами. Горячий (hot) наблюдаемый объект представляет собой поток событий, который всегда находится в движении, и, если при появлении события нет ни одного подписчика, оно теряется. Например, перемещение мыши является горячим наблюдаемым событием. У холодного (cold) наблюдаемого объекта события не поступают постоянно. Холодный наблюдаемый объект реагирует на подписку, начиная последовательность событий. Например, загрузка HTTP является холодным наблюдаемым объектом; подписка инициирует отправку запроса HTTP.
Оператор Subscribe также всегда должен получать параметр обработки ошибок. В предыдущих примерах этого параметра нет; ниже приведен более правильный пример, который будет правильно реагировать, если наблюдаемый поток завершается с ошибкой:
Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Where(x => x.Value % 2 == 0)
.Select(x => x.Timestamp)
.Subscribe(x => Trace.WriteLine(x),
ex => Trace.WriteLine(ex));
Subject<TResult> — один из типов, который может пригодиться при экспериментах с System.Reactive. Он напоминает ручную реализацию наблюдаемого потока. Ваш код может вызывать OnNext, OnError и OnCompleted, а объект будет передавать эти вызовы своим подписчикам. Subject<TResult> хорошо подходит для экспериментов, но в реально эксплуатируемом коде лучше использовать операторы вроде тех, которые показаны в главе 6.
Существует множество полезных операторов System.Reactive, и в этой книге рассматриваются лишь отдельные примеры. За дополнительной информацией о System.Reactive рекомендую обращаться к превосходной электронной книге «Introduction to Rx» (/).
Библиотека TPL Dataflow — интересное сочетание асинхронных и параллельных технологий. Эта библиотека может быть полезной для последовательности процессов, которые должны применяться к вашим данным. Представьте, что нужно загрузить данные по URL-адресу, разобрать их, а затем обработать параллельно с другими данными. TPL Dataflow обычно используется в качестве простого конвейера: данные входят с одного конца и перемещаются, пока не выйдут с другого конца. Однако возможности TPL Dataflow этим далеко не ограничиваются; библиотека способна справиться с сетчатыми (mesh) структурами любого типа. Вы можете определять в сетях ветвления, объединения и циклы — TPL Dataflow обработает их так, как нужно. Но в большинстве случаев сети TPL Dataflow используются как конвейеры.
Базовым структурным элементом сети потока данных (dataflow mesh) является блок потока данных (dataflow block). Блок может быть блоком-приемником (получение данных), блоком-источником (производство данных) или их сочетанием. Блоки-источники могут связываться с блоками-приемниками для формирования сети; связывание рассматривается в рецепте 5.1. Блоки являются полунезависимыми; они обрабатывают данные по мере поступления и передают результат дальше. В обычном способе использования TPL Dataflow вы создаете все блоки, устанавливаете связи между ними, а затем начинаете подавать данные с одного конца. Данные после этого выходят с другого конца сами по себе. Еще раз уточню, что возможности потоков данных этим не ограничиваются; можно создавать связи и добавлять их в сеть в то время, когда по ним перемещаются данные, но это весьма нетривиальный сценарий.
Блоки-приемники содержат буферы для получаемых данных. Наличие буфера позволяет им получать новые элементы данных даже в том случае, если они еще не готовы к их обработке; это позволяет данным перемещаться по сети. Такая буферизация может создать проблемы в сценариях с ветвлением, в которых один блок-источник связывается с двумя блоками-приемниками. Если у блока-источника имеются данные для отправки по направлению потока, он начинает предлагать их своим связанным блокам по одному. По умолчанию первый блок-приемник просто получает данные и буферизует их, а второй блок-приемник эти данные никогда не получит. Проблема решается ограничением буферов блоков-приемников; эта тема рассматривается в рецепте 5.4.
Если что-то пойдет не так, происходит отказ блока — например, если обрабатывающий делегат выдает исключение при обработке элемента данных. Когда в блоке происходит отказ, он перестает получать данные. По умолчанию это не приводит к нарушению работоспособности всей сети, а позволяет перестроить эту часть сети или перенаправить данные. Тем не менее это нетривиальный сценарий; в большинстве случаев обычно нужно, чтобы отказы распространялись по связям к целевым блокам. Поток данных тоже поддерживает этот вариант; единственный неочевидный аспект — исключение, распространяемое по связям, упаковывается в AggregateException. Следовательно, при длинном конвейере могут появляться исключения с большой глубиной вложенности; проблему можно обойти с помощью метода AggregateException.Flatten:
try
{
var multiplyBlock = new TransformBlock<int, int>(item =>
{
if (item == 1)
throw new InvalidOperationException("Blech.");
return item * 2;
});
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
multiplyBlock.LinkTo(subtractBlock,
new DataflowLinkOptions { PropagateCompletion = true });
multiplyBlock.Post(1);
subtractBlock.Completion.Wait();
}
catch (AggregateException exception)
{
AggregateException ex = exception.Flatten();
Trace.WriteLine(ex.InnerException);
}
Обработка ошибок в потоках данных более подробно рассматривается в рецепте 5.2.
На первый взгляд сети потоков данных очень похожи на наблюдаемые потоки. Как у сетей, так и у потоков существует концепция элементов данных, которые в них перемещаются. Кроме того, у сетей и у потоков есть концепции нормального завершения (уведомление о том, что данные перестали поступать) и завершения с отказом (уведомление о том, что в ходе обработки данных произошла некоторая ошибка). Но System.Reactive (Rx) и TPL Dataflow обладают разными возможностями. Наблюдаемые объекты Rx в общем случае лучше блоков потока данных при выполнении любых операций, связанных с хронометражом. Блоки потоков данных в общем случае лучше наблюдаемых объектов Rx при выполнении параллельной обработки. На концептуальном уровне работа Rx напоминает настройку обратных вызовов: каждый шаг наблюдаемого объекта напрямую вызывает следующий шаг. С другой стороны, каждый блок в сети потока данных практически независим от всех остальных блоков. Как Rx, так и TPL Dataflow имеют собственные области применения, которые отчасти перекрываются. Они также хорошо работают вместе; в рецепте 8.8 рассматриваются возможности взаимодействия между Rx и TPL Dataflow.
Если вы знакомы с акторскими фреймворками, то увидите, что TPL Dataflow на первый взгляд имеет ряд общих черт с ними. Каждый блок потока данных независим от других — он запускает задачи для выполнения работы по мере необходимости (например, выполнения преобразующего делегата или передачи вывода следующему блоку). Можно также настроить каждый блок для параллельного выполнения, чтобы он запускал несколько задач для обработки дополнительного ввода. Из-за этого поведения каждый блок отчасти напоминает актора в акторских фреймворках. Но TPL Dataflow не является полноценным акторским фреймворком; в частности, отсутствует встроенная поддержка корректного восстановления после ошибок или повторных попыток. TPL Dataflow — библиотека с функциональностью, сходной с функциональностью акторов, но не являющаяся полноценным акторским фреймворком.
Самые распространенные типы блоков — TransformBlock<TInput, TOutput> (аналог LINQ Select), TransformManyBlock<TInput, TOutput> (аналог LINQ SelectMany) и ActionBlock<TResult>, выполняющий делегата для каждого элемента данных. За дополнительной информацией о TPL Dataflow я рекомендую обращаться к документации MSDN () и руководству «Guide to Implementing Custom TPL Dataflow Blocks» ().
Поток является независимым исполнителем (executor). Каждый процесс состоит из нескольких потоков, и все эти потоки могут выполнять разные операции одновременно. Каждый поток имеет собственный независимый стек, но он совместно использует память со всеми остальными потоками процесса. В некоторых приложениях существует один специальный поток. Например, приложения с пользовательским интерфейсом имеют один специальный UI-поток, а у консольных приложений существует один специальный главный поток.
У каждого приложения .NET имеется пул потоков. Пул потоков содержит набор рабочих потоков, готовых к выполнению любой работы, которая им будет назначена. Пул потоков отвечает за определение количества потоков, находящихся в пуле потоков в любой момент времени. Есть десятки настроек конфигурации, с которыми можно экспериментировать для изменения этого поведения, но я не рекомендую это делать; пул потоков был тщательно оптимизирован для большинства реальных ситуаций.
Создавать новые потоки самостоятельно вам не потребуется. Единственная ситуация, в которой может возникнуть необходимость в создании экземпляров Thread, — создание потоков STA для COM-взаимодействий.
Поток относится к низкоуровневым абстракциям. Пул потоков находится на чуть более высоком уровне абстракции; когда код ставит работу в очередь пула потоков, то сам пул потоков в случае необходимости позаботится о создании потока. Абстракции, рассмотренные в книге, находятся на еще более высоком уровне: параллельная обработка и потоки данных ставят работу в очередь пула потоков по мере необходимости. Код, использующий эти высокоуровневые абстракции, пишется проще, чем код, работающий с низкоуровневыми абстракциями.
По этой причине типы Thread и BackgroundWorker в книге не рассматриваются вообще. Их время прошло.
Существует пара разновидностей коллекций, которые могут принести пользу при конкурентном программировании: конкурентные коллекции и неизменяемые коллекции. Обе категории коллекций рассматриваются в главе 9. Конкурентные коллекции позволяют нескольким потокам обновлять их одновременно с обеспечением безопасности. Многие конкурентные коллекции используют снимки (snapshots) текущего состояния, чтобы один поток мог перечислять значения, пока другой может добавлять или удалять значения. Конкурентные коллекции обычно работают эффективнее простой защиты обычной коллекции с помощью блокировок (lock).
С неизменяемыми коллекциями дело обстоит иначе. Неизменяемая коллекция действительно не может изменяться; вместо этого для модификации неизменяемой коллекции создается новая коллекция, представляющая измененную коллекцию. Это может показаться ужасно неэффективным, но неизменяемые коллекции разделяют максимально возможный объем памяти между экземплярами коллекций, поэтому все не так плохо. Одно из достоинств неизменяемых коллекций заключается в том, что все их операции являются чистыми, поэтому они очень хорошо работают в сочетании с функциональным кодом.
У многих современных технологий есть одно сходство: они функциональны по своей природе. В данном случае речь идет не о функциональности в том смысле, что «они делают то, что положено», а в смысле стиля программирования, основанного на композиции функций. И если вы возьмете на вооружение функциональный менталитет, ваши конкурентные архитектуры будут менее запутанными.
Одним из принципов функционального программирования является чистота (т. е. отсутствие побочных эффектов). Каждый компонент решения получает некоторое значение(-я) на входе и выдает некоторое значение(-я) на выходе. Эти компоненты должны настолько, насколько это возможно, избегать зависимости этих компонентов от глобальных (или общих) переменных или обновления глобальных (или общих) структур данных. Это справедливо для любых компонентов: async-методов, параллельных задач, операций System.Reactive или блоков потоков данных. Конечно, рано или поздно ваши вычисления должны на что-то повлиять, но код будет более элегантным, если вы сможете провести обработку в чистых блоках, а затем проводить обновления с результатами.
Другой принцип функционального программирования — неизменяемость. Неизменяемость означает, что блок данных не может изменяться. Почему неизменяемость полезна в конкурентных программах? Одна из причин заключается в том, что для неизменяемых данных не нужна синхронизация; если данные не могут измениться, то синхронизация становится излишней. Кроме того, неизменяемые данные помогают предотвратить побочные эффекты. Разработчики все чаще используют неизменяемые типы, и в книге представлено несколько рецептов, в которых рассматриваются неизменяемые структуры данных.
Фреймворк .NET в некоторой степени поддерживает асинхронное программирование с самых первых версий. Тем не менее асинхронное программирование было достаточно трудным делом до 2012 года, когда в .NET 4.5 (вместе с C# 5.0 и VB 2012) появились ключевые слова async и await. В этой книге во всех асинхронных рецептах используется современный подход с async/await и есть рецепты, демонстрирующие взаимодействие async и более старых паттернов асинхронного программирования. Если вам понадобится поддержка старых платформ, обращайтесь к приложению А.
Библиотека Task Parallel Library была представлена в .NET 4.0 с полной поддержкой как параллелизма данных, так и параллелизма задач. В наши дни она доступна даже на платформах с меньшими ресурсами, включая мобильные телефоны. Библиотека TPL построена на базе .NET.
Команда разработчиков System.Reactive приложила немало усилий для поддержки максимального количества платформ. System.Reactive, как и async с await, предоставляет полезные возможности для любых типов приложений — как клиентских, так и серверных. Поддержка System.Reactive доступна в пакете System.Reactive.
Библиотека TPL Dataflow официально распространяется в составе пакета NuGet для System.Threading.Tasks.Dataflow.
Многие конкурентные коллекции встроены в .NET; также существуют другие конкурентные коллекции, содержащиеся в пакете System.Threading.Channels. Неизменяемые коллекции доступны в пакете System.Collections.Immutable.