Если в вашем приложении используется конкурентность (как практически во всех приложениях .NET), следует остерегаться ситуаций, в которых один блок кода должен обновлять данные, пока другой код обращается к тем же данным. Столкнувшись с подобной ситуацией, необходимо синхронизировать доступ к данным. В рецептах этой главы рассматриваются наиболее распространенные типы, используемые для синхронизации доступа. Впрочем, если вы будете правильно пользоваться другими рецептами из книги, то увидите, что большая часть синхронизации в типичных ситуациях уже реализуется за вас соответствующими библиотеками. Но прежде чем углубляться в рецепты, посвященные синхронизации, будут описаны некоторые типичные ситуации, в которых может появиться такая необходимость.
Описания синхронизации в этом разделе несколько упрощены, но все заключения правильны.
Существует две основные разновидности синхронизации: передачи данных и защиты данных. Синхронизация при передаче данных используется в тех случаях, когда один блок кода должен оповестить другой блок кода о некотором условии (например, о поступлении нового сообщения). Синхронизация при передаче данных будет более подробно рассмотрена в рецептах этой главы; оставшаяся часть введения будет посвящена защите данных.
Синхронизация должна использоваться для защиты общих данных при выполнении всех трех условий из следующего списка:
• Несколько частей кода выполняются одновременно.
• Эти части кода обращаются (читают или записывают) одни и те же данные.
• По крайней мере одна часть кода обновляет (или записывает) данные.
Причина для первого условия должна быть очевидна; если весь ваш код выполняется от начала к концу и ничего не происходит одновременно, то вообще не нужно беспокоиться о синхронизации. Некоторые простые консольные приложения могут работать по этому принципу, но в большинстве приложений .NET используется та или иная форма конкурентности. Второе условие означает, что если каждый блок кода работает с собственными локальными данными, к которым не могут обращаться другие, то необходимость в синхронизации отсутствует; другие части кода никогда не обращаются к локальным данным. Также необходимость в синхронизации отсутствует и в том случае, если общие данные присутствуют, но никогда не изменяются: например, если данные определяются с использованием неизменяемых типов. Под третье условие подпадают такие сценарии, как данные конфигурации и т.п.; они задаются в начале работы приложения, а затем никогда не изменяются. Если общие данные используются только для чтения, то они не нуждаются в синхронизации; она необходима только для данных, которые одновременно находятся в общем доступе и изменяются.
Цель защиты данных заключается в том, чтобы каждая часть кода имела целостное представление данных. Если одна часть кода обновляет данные, вы можете воспользоваться синхронизацией, чтобы обновления воспринимались как атомарные для остальных компонентов системы.
Вероятно, вы не сразу научитесь понимать, когда нужна синхронизация, поэтому перед тем как переходить к рецептам этой главы, рассмотрим несколько примеров. Начнем со следующего кода:
async Task MyMethodAsync()
{
int value = 10;
await Task.Delay(TimeSpan.FromSeconds(1));
value = value + 1;
await Task.Delay(TimeSpan.FromSeconds(1));
value = value - 1;
await Task.Delay(TimeSpan.FromSeconds(1));
Trace.WriteLine(value);
}
Если метод MyMethodAsync вызывается в потоке из пула потоков (например, из Task.Run), то строки кода, обращающиеся к значению, могут выполняться в разных потоках. Но понадобится ли синхронизация в этом случае? Нет, потому что они не могут выполняться одновременно. Это асинхронный метод, но он также является последовательным (т. е. сначала выполняется одна часть, потом другая и т.д.).
Немного усложним пример. На этот раз будет выполняться конкурентный асинхронный код:
private int value;
async Task ModifyValueAsync()
{
await Task.Delay(TimeSpan.FromSeconds(1));
value = value + 1;
}
// ВНИМАНИЕ: может требовать синхронизации; см. ниже.
async Task<int> ModifyValueConcurrentlyAsync()
{
// Start three concurrent modifications.
Task task1 = ModifyValueAsync();
Task task2 = ModifyValueAsync();
Task task3 = ModifyValueAsync();
await Task.WhenAll(task1, task2, task3);
return value;
}
Этот код запускает три изменения, выполняемых конкурентно. Понадобится ли синхронизация? Зависит от обстоятельств. Если вы знаете, что метод вызывается из GUI-контекста или контекста ASP.NET (или любого контекста, который позволяет выполняться только одному фрагменту кода в любой момент времени), синхронизация будет излишней, потому что при выполнении кода изменения data он будет выполняться в разное время с двумя другими изменениями. Например, если приведенный код выполняется в GUI-контексте, то существует только один UI-поток, который будет выполнять каждое изменение data, поэтому он должен выполнять их по одному. Итак, если вы знаете, что контекст является последовательным (one-at-a-time), то синхронизация не нужна. Но если тот же метод вызывается в потоке из пула потоков (например, из Task.Run), то синхронизация будет необходима. В данном случае три изменения данных могут выполняться в разных потоках из пула и обновлять data.Value одновременно, поэтому доступ к data.Value требуется синхронизировать.
А теперь еще одна вариация:
private int value;
async Task ModifyValueAsync()
{
int originalValue = value;
await Task.Delay(TimeSpan.FromSeconds(1));
value = originalValue + 1;
}
Посмотрим, что произойдет, если ModifyValueAsync вызывается несколько раз конкурентно. Даже при вызове из последовательного контекста поле данных используется совместно всеми вызовами ModifyValueAsync, а значение может измениться в любое время, когда в методе выполняется await. Иногда синхронизация применяется даже в последовательных контекстах для предотвращения общего доступа такого рода. Иначе говоря, если вы хотите добиться того, чтобы каждый вызов ModifyValueAsync ожидал завершения всех предыдущих вызовов, следует добавить синхронизацию. Это справедливо даже в том случае, если контекст гарантирует, что для всего кода используется только один поток (т. е. UI-поток). Синхронизация в этом сценарии является разновидностью регулировки для асинхронных методов (см. раздел 12.2).
Рассмотрим еще один пример с async. Task.Run можно использовать для того, что я называю простым параллелизмом — простейшей разновидностью параллельной обработки, которая не обладает такой эффективностью и возможностями настройки, как истинный параллелизм Parallel/PLINQ. Следующий код обновляет общее значение с использованием простого параллелизма:
// ПЛОХОЙ КОД!!
async Task<int> SimpleParallelismAsync()
{
int value = 0;
Task task1 = Task.Run(() => { value = value + 1; });
Task task2 = Task.Run(() => { value = value + 1; });
Task task3 = Task.Run(() => { value = value + 1; });
await Task.WhenAll(task1, task2, task3);
return value;
}
В этом коде три отдельные задачи выполняются в пуле потоков (через Task.Run), причем все они изменяют одно значение value. Следовательно, условия выполняются, и синхронизация определенно необходима. Обратите внимание: синхронизация нужна даже несмотря на то, что переменная value является локальной; она все равно совместно используется разными потоками, хотя и является локальной для одного метода.
Переходя к настоящему параллельному коду, рассмотрим пример, в котором используется тип Parallel:
void IndependentParallelism(IEnumerable<int> values)
{
Parallel.ForEach(values, item => Trace.WriteLine(item));
}
Так как в коде используется Parallel, необходимо предполагать, что тело параллельного цикла (item =>Trace.WriteLine(item)) может выполняться в нескольких потоках. Однако тело цикла читает только собственные данные; совместного использования данных между потоками здесь не будет. Класс Parallel разделяет данные между потоками, чтобы им не приходилось совместно использовать свои данные. Каждый поток, выполняющий тело цикла, не зависит от всех остальных потоков, выполняющих то же тело цикла. Таким образом, синхронизация в приведенном коде не нужна.
Рассмотрим пример агрегирования, похожий на описанный в рецепте 4.2:
// ПЛОХОЙ КОД!!
int ParallelSum(IEnumerable<int> values)
{
int result = 0;
Parallel.ForEach(source: values,
localInit: () => 0,
body: (item, state, localValue) => localValue + item,
localFinally: localValue => { result += localValue; });
return result;
}
В этом примере код снова использует несколько потоков; на этот раз каждый поток в начале своей работы инициализирует свое локальное значение 0(() => 0), и для каждого входного значения, обработанного потоком, входное значение прибавляется к локальному ((item,state, localValue) =>localValue + item). Наконец, все локальные значения прибавляются к возвращаемому значению (localValue => { result += localValue; }). Первые два шага не создают проблем, потому что потоки не имеют общих данных; локальное и входное значение каждого потока существует независимо от локальных и входных значений всех остальных потоков, но на последнем шаге возникают проблемы; когда локальное значение каждого потока прибавляется к возвращаемому значению, возникает ситуация, в которой сразу несколько потоков обращаются к общей переменной (result) и обновляют ее. Таким образом, на последнем шаге необходимо применить синхронизацию (см. рецепт 12.1).
PLINQ, TPL Dataflow и реактивные библиотеки очень похожи на примеры Parallel: пока ваш код имеет дело только со своим собственным вводом, ему не приходится беспокоиться о синхронизации. По опыту могу сказать, что при правильном использовании этих библиотек добавлять синхронизацию в свой код мне почти никогда не приходится.
И наконец, поговорим о коллекциях. Вспомните три условия, требующие синхронизации: наличие нескольких частей кода, общие данные и обновления данных.
Неизменяемые типы потокобезопасны по своей природе, потому что не могут изменяться; невозможно обновить неизменяемую коллекцию, поэтому синхронизация не нужна. Например, следующий код не требует синхронизации, так как когда каждый отдельный поток из пула потоков заносит значение в стек, он создает новый неизменяемый стек с этим значением, а исходный стек остается без изменений.
async Task<bool> PlayWithStackAsync()
{
ImmutableStack<int> stack = ImmutableStack<int>.Empty;
Task task1 = Task.Run(() => Trace.WriteLine(stack.Push(3).Peek()));
Task task2 = Task.Run(() => Trace.WriteLine(stack.Push(5).Peek()));
Task task3 = Task.Run(() => Trace.WriteLine(stack.Push(7).Peek()));
await Task.WhenAll(task1, task2, task3);
return stack.IsEmpty; // Всегда возвращает true.
}
Когда в вашем коде используются неизменяемые коллекции, обычно в нем также присутствует общая «корневая» переменная, которая сама по себе неизменяемой не является. В этом случае приходится применять синхронизацию. В следующем примере каждый поток заносит значение в стек (создавая новый неизменяемый стек), после чего обновляет общую корневую переменную; для обновления переменной stack синхронизация необходима:
// ПЛОХОЙ КОД!!
async Task<bool> PlayWithStackAsync()
{
ImmutableStack<int> stack = ImmutableStack<int>.Empty;
Task task1 = Task.Run(() => { stack = stack.Push(3); });
Task task2 = Task.Run(() => { stack = stack.Push(5); });
Task task3 = Task.Run(() => { stack = stack.Push(7); });
await Task.WhenAll(task1, task2, task3);
return stack.IsEmpty;
}
С потокобезопасными коллекциями (например, ConcurrentDictionary) дело обстоит иначе. В отличие от неизменяемых коллекций, потокобезопасные коллекции могут обновляться. Но вся необходимая синхронизация уже встроена в них, так что вам не придется беспокоиться о синхронизации изменений в коллекции. Если бы в следующем коде обновлялась коллекция Dictionary вместо ConcurrentDictionary, то синхронизация была бы необходима; но поскольку обновляется ConcurrentDictionary, она становится излишней:
async Task<int> ThreadsafeCollectionsAsync()
{
var dictionary = new ConcurrentDictionary<int, int>();
Task task1 = Task.Run(() => { dictionary.TryAdd(2, 3); });
Task task2 = Task.Run(() => { dictionary.TryAdd(3, 5); });
Task task3 = Task.Run(() => { dictionary.TryAdd(5, 7); });
await Task.WhenAll(task1, task2, task3);
return dictionary.Count; // Всегда возвращает 3.
}
Имеются общие данные. Требуется обеспечить безопасное чтение и запись этих данных из нескольких потоков.
Лучшее решение в такой ситуации — использование команды блокировки lock. Если поток входит в блок lock, то все остальные потоки не смогут войти в этот блок, пока блокировка не будет снята:
class MyClass
{
// Блокировка защищает поле _value.
private readonly object _mutex = new object();
private int _value;
public void Increment()
{
lock (_mutex)
{
_value = _value + 1;
}
}
}
В фреймворке .NET существует несколько механизмов блокировки: Monitor, Spin, Lock и ReaderWriterLockSlim. В большинстве приложений эти типы блокировок практически никогда не должны использоваться напрямую. В частности, для разработчиков проще переключиться на ReaderWriterLockSlim, когда такая сложность не является необходимой. Базовая команда lock нормально справляется с 99 % случаев.
При использовании блокировок следует руководствоваться четырьмя важными рекомендациями:
• Ограничьте видимость блокировки.
• Документируйте, что именно защищает блокировка.
• Сократите до минимума объем кода, защищенного блокировкой.
• Никогда не выполняйте произвольный код при удержании блокировки.
Во-первых, стремитесь к ограничению видимости блокировки. Объект, используемый в команде lock, должен быть приватным полем, которое никогда не должно быть доступным для любых методов за пределами класса. Обычно есть не более одного поля блокировки на тип; если у вас их несколько, рассмотрите возможность рефакторинга этого типа на несколько типов. Блокировка может устанавливаться по любому ссылочному типу, но я предпочитаю создавать отдельное поле специально для команды lock, как в последнем примере. Если вы устанавливаете блокировку по другому экземпляру, убедитесь в том, что он является приватным для вашего класса; он не должен передаваться в конструкторе или возвращаться из get-метода свойства. Никогда не используйте lock(this) или lock с любым экземпляром Type или string; это может привести к взаимоблокировкам, доступным из другого кода.
Во-вторых, документируйте, что именно защищает блокировка. Об этом шаге легко забыть во время первоначального написания кода, но он становится более важным по мере роста сложности кода.
В-третьих, сократите до минимума объем кода, защищенного блокировкой. Один из аспектов, на которые следует обращать внимание, — блокирующие вызовы при удержании блокировок. В идеале их быть вообще не должно.
Наконец, никогда не вызывайте произвольный код при удержании блокировки. Произвольный код может включать выдачу событий, вызов виртуальных методов или вызов делегатов. Если вам потребуется выполнить произвольный код, сделайте это после снятия блокировки.
В рецепте 12.2 рассматриваются async-совместимые блокировки. Команда lock несовместима с await.
В рецепте 12.3 рассматривается передача сигналов между потоками. Команда lock предназначена для защиты общих данных, а не для отправки сигналов между потоками.
В рецептах 12.5 рассматривается регулировка, которая представляет собой обобщенную форму блокировки. Блокировка может рассматриваться как регулировка до уровня 1.
Имеются общие данные. Требуется обеспечить безопасное чтение и запись этих данных из разных программных блоков, которые могут использовать await.
Тип SemaphoreSlim из фреймворка .NET был обновлен в .NET 4.5 для обеспечения совместимости с async. Пример использования:
class MyClass
{
// Блокировка защищает поле _value.
private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1);
private int _value;
public async Task DelayAndIncrementAsync()
{
await _mutex.WaitAsync();
try
{
int oldValue = _value;
await Task.Delay(TimeSpan.FromSeconds(oldValue));
_value = oldValue + 1;
}
finally
{
_mutex.Release();
}
}
}
Также можно воспользоваться типом AsyncLock из библиотеки Nito.AsyncEx, который обладает чуть более элегантным API:
class MyClass
{
// Блокировка защищает поле _value.
private readonly AsyncLock _mutex = new AsyncLock();
private int _value;
public async Task DelayAndIncrementAsync()
{
using (await _mutex.LockAsync())
{
int oldValue = _value;
await Task.Delay(TimeSpan.FromSeconds(oldValue));
_value = oldValue + 1;
}
}
}
В этой ситуации действуют рекомендации из рецепта 12.1:
• Ограничьте видимость блокировки.
• Документируйте, что именно защищает блокировка.
• Сократите до минимума объем кода, защищенного блокировкой.
• Никогда не выполняйте произвольный код при удержании блокировки.
Экземпляры блокировок должны быть приватными; они не должны быть доступными за пределами класса. Обязательно четко документируйте (и тщательно продумывайте), что именно защищает экземпляр блокировки. Сведите к минимуму объем кода, выполняемого при удержании блокировки. В частности, не вызывайте произвольный код, включая выдачу событий, вызов виртуальных методов и вызов делегатов.
Тип AsyncLock находится в пакете Nito.AsyncEx.
В рецепте 12.4 рассматриваются async-совместимые сигналы. Блокировки предназначены для защиты общих данных, а не для передачи сигналов.
В рецепте 12.5 рассматривается регулировка, которая представляет собой обобщенную форму. Блокировка может рассматриваться как регулировка до уровня 1.
Требуется отправить уведомление от одного потока другому.
Самый распространенный и универсальный межпотоковый сигнал — событие с ручным сбросом ManualResetEventSlim. Событие с ручным сбросом может находиться в одном из двух состояний: установленном или сброшенном. Любой поток может перевести поток в установленное состояние или провести его сброс. Поток также может ожидать перехода события в установленное состояние.
Следующие два метода вызываются разными потоками; один поток ожидает сигнала от другого:
class MyClass
{
private readonly ManualResetEventSlim _initialized =
new ManualResetEventSlim();
private int _value;
public int WaitForInitialization()
{
_initialized.Wait();
return _value;
}
public void InitializeFromAnotherThread()
{
_value = 13;
_initialized.Set();
}
}
ManualResetEventSlim — отличный универсальный сигнал, передаваемый одним потоком другому, однако использовать его следует только тогда, когда это действительно уместно. Если «сигнал» представляет собой сообщение, отправляющее некоторые данные между потоками, рассмотрите возможность использования очереди «производитель/потребитель». С другой стороны, если сигналы используются только для координации доступа к общим данным, лучше использовать lock.
В фреймворке .NET существуют и другие разновидности сигналов синхронизации потоков, которые используются реже. Если ManualResetEventSlim не подходит для ваших потребностей, подумайте об использовании AutoResetEvent, CountdownEvent или Barrier.
ManualResetEventSlim является синхронным сигналом, поэтому WaitForInitialization блокирует вызывающий поток до отправки сигнала. Если вы хотите ожидать сигнала без приостановки потока, используйте асинхронный сигнал так, как описано в рецепте 12.4.
В рецепте 9.6 рассматриваются блокирующие очереди «производитель/потребитель».
В рецепте 12.1 рассматривается команда lock.
В рецепте 12.4 рассматриваются async-совместимые сигналы.
Требуется отправить уведомление от одного потока другому, при этом получатель оповещения должен ожидать его асинхронно.
Используйте TaskCompletionSource<T> для того, чтобы отправить уведомление асинхронно, если уведомление должно быть отправлено только один раз. Код-отправитель вызывает TrySetResult, а код-получатель ожидает его свойство Task:
class MyClass
{
private readonly TaskCompletionSource<object> _initialized =
new TaskCompletionSource<object>();
private int _value1;
private int _value2;
public async Task<int> WaitForInitializationAsync()
{
await _initialized.Task;
return _value1 + _value2;
}
public void Initialize()
{
_value1 = 13;
_value2 = 17;
_initialized.TrySetResult(null);
}
}
Тип TaskCompletionSource<T> может использоваться для асинхронного ожидания любой ситуации — в данном случае уведомления от другой части кода. Этот способ хорошо работает, если сигнал отправляется только один раз, но совершенно не работает, если сигнал нужно не только включать, но и отключать.
Библиотека Nito.AsyncEx содержит тип AsyncManualResetEvent — приблизительный аналог ManualResetEvent для асинхронного кода. Следующий пример является искусственным, но показывает, как правильно использовать тип AsyncManualResetEvent:
class MyClass
{
private readonly AsyncManualResetEvent _connected =
new AsyncManualResetEvent();
public async Task WaitForConnectedAsync()
{
await _connected.WaitAsync();
}
public void ConnectedChanged(bool connected)
{
if (connected)
_connected.Set();
else
_connected.Reset();
}
}
Сигналы представляют собой механизм уведомлений общего назначения. Но если этот «сигнал» представляет собой сообщение, используемое для отправки данных от одной части кода в другую, рассмотрите возможность использования очереди «производитель/потребитель». Не стоит использовать и сигналы общего назначения для простой координации доступа к общим данным; в таких ситуациях следует применять асинхронную блокировку.
Тип AsyncManualResetEvent находится в пакете Nito.AsyncEx.
В рецепте 9.8 рассматриваются асинхронные очереди «производитель/потребитель».
В рецепте 12.2 рассматриваются асинхронные блокировки.
В рецепте 12.3 рассматриваются блокирующие сигналы, которые могут использоваться для передачи уведомлений между потоками.
Имеется код с высокой степенью конкурентности — даже слишком высокой. Требуется найти способ скорректировать конкурентность.
Конкурентность оказывается чрезмерной, если части приложения не успевают друг за другом, элементы данных накапливаются и занимают память. В этом сценарии регулировка частей кода может предотвратить проблемы с памятью.
Решение зависит от типа конкурентности, используемой в вашем коде. Все представленные решения ограничивают конкурентность конкретным значением. В Reactive Extensions предусмотрены более разнообразные возможности — например, скользящие временные окна; регулировка по наблюдаемым объектам System.Reactive более подробно рассматривается в рецепте 6.4.
В Dataflow и в параллельном коде существуют встроенные параметры для регулировки степени конкурентности:
IPropagatorBlock<int, int> DataflowMultiplyBy2()
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
};
return new TransformBlock<int, int>(data => data * 2, options);
}
// Использование Parallel LINQ (PLINQ)
IEnumerable<int> ParallelMultiplyBy2(IEnumerable<int> values)
{
return values.AsParallel()
.WithDegreeOfParallelism(10)
.Select(item => item * 2);
}
// Использование класса Parallel
void ParallelRotateMatrices(IEnumerable<Matrix> matrices, float degrees)
{
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 10
};
Parallel.ForEach(matrices, options, matrix => matrix.Rotate(degrees));
}
Конкурентный асинхронный код может регулироваться с помощью SemaphoreSlim:
async Task<string[]> DownloadUrlsAsync(HttpClient client,
IEnumerable<string> urls)
{
using var semaphore = new SemaphoreSlim(10);
Task<string>[] tasks = urls.Select(async url =>
{
await semaphore.WaitAsync();
try
{
return await client.GetStringAsync(url);
}
finally
{
semaphore.Release();
}
}).ToArray();
return await Task.WhenAll(tasks);
}
Регулировка может быть необходимой тогда, когда вы обнаруживаете, что код задействует слишком много ресурсов (например, процессорного времени или сетевых подключений). Учтите, что конечные пользователи обычно работают на машинах, менее мощных, чем у разработчиков, поэтому чрезмерная регулировка обычно лучше недостаточной.
В рецепте 6.4 рассматривается регулировка реактивного кода.