Книга: Конкурентность в C#. Асинхронное, параллельное и многопоточное программирование. 2-е межд. изд.
Назад: Глава 8. Взаимодействие
Дальше: Глава 10. Отмена

Глава 9. Коллекции

Правильный выбор коллекций чрезвычайно важен для конкурентных приложений. Речь не о стандартных коллекциях вроде List<T>; полагаю, вы о них уже знаете. В этой главе я хочу представить новые коллекции, предназначенные специально для конкурентного или асинхронного использования.

Неизменяемые коллекции представляют собой экземпляры коллекций, которые не могут изменяться ни при каких условиях. На первый взгляд может показаться, что они абсолютно бесполезны; но неизменяемые коллекции чрезвычайно полезны даже в однопоточных, неконкурентных приложениях. Операции, доступные только для чтения (например, перечисление) выполняются с неизменяемым экземпляром напрямую. Операции записи (например, добавление элемента) возвращают новый неизменяемый экземпляр вместо изменения существующего экземпляра. Это не настолько неэффективно, как может показаться, потому что в большинстве случаев неизменяемые коллекции совместно используют бˆольшую часть своей памяти. Кроме того, неизменяемые коллекции обладают таким преимуществом, как неявная безопасность обращения из разных потоков; если коллекции не могут изменяться, то они являются потокобезопасными.

lemur.tiff

Неизменяемые коллекции находятся в NuGet-пакете System.Collections.Immutable.

Неизменяемые коллекции — относительно новое явление, и стоит применять их в новых разработках, только если вам не нужен изменяемый экземпляр. Если вы не знакомы с неизменяемыми коллекциями, рекомендую начать с рецепта 9.1; даже если вам не нужен именно стек или очередь — в этом рецепте рассматриваются некоторые общие паттерны, задействованные для всех неизменяемых коллекций.

Есть специальные способы более эффективного конструирования неизменяемых коллекций с большим количеством существующих элементов; в примерах из этих рецептов элементы добавляются по одному. В документации MSDN содержится подробная информация об эффективном создании неизменяемых коллекций, если вам понадобится ускорить процесс инициализации.

• Потокобезопасные коллекции. Эти изменяемые экземпляры коллекций могут изменяться несколькими потоками одновременно. Потокобезопасные коллекции используют сочетание детализированных блокировок и приемов, не использующих блокировки, которое гарантирует, что потоки будут блокироваться на минимальное время (а обычно не блокируются вовсе). В случае потокобезопасных коллекций при перечислении коллекции создается ее снимок, после чего перечисление выполняется с этим снимком. Ключевое преимущество потокобезопасных коллекций — возможность безопасного обращения к ним из нескольких потоков при том, что операции будут блокировать ваш код на минимальное время (или не блокировать вообще).

• Коллекции «производитель/потребитель». Эти экземпляры изменяемых коллекций проектировались с конкретной целью: разрешить (возможно, нескольким) производителям заносить элементы в коллекцию, в то время как потребители (которых тоже может быть несколько) извлекают элементы из коллекции. Таким образом, коллекции играют роль моста между кодом производителя и кодом потребителя с дополнительной возможностью ограничить количество элементов в коллекции. Коллекции «производитель/потребитель» также могут иметь блокирующий или асинхронный API. Например, если коллекция пуста, блокирующая коллекция «производитель/потребитель» блокирует вызывающий поток-потребитель до того момента, когда будет добавлен новый элемент; с другой стороны, асинхронная коллекция «производитель/потребитель» позволяет вызывающему потоку асинхронно ожидать добавления нового элемента.

В рецептах этой главы используются разные коллекции «производитель/потребитель», обладающие разными преимуществами. Таблица 9.1 поможет определиться, какую коллекцию стоит использовать в конкретном случае.

Таблица 9.1. Коллекции «производитель/потребитель»

Признак

Channels

Blocking­Collection<T>

Buffer­Block<T>

AsyncProducer­ConsumerQueue<T>

Async­Collection<T>

Семантика очереди

ü

ü

ü

ü

ü

Семантика стека/мультимножества

×

ü

×

×

ü

Синхронный API

ü

ü

ü

ü

ü

Асинхронный API

ü

×

ü

ü

ü

Потеря элементов при переполнении

ü

×

×

×

×

Протестирован Microsoft

ü

ü

ü

×

×

lemur.tiff

Библиотека Channels находится в пакете System.Threa­ding.Channels, BufferBlock<T> — в пакете System.Threa­ding.Tasks.Dataflow, а AsyncProducerConsumerQueue<T> и Async­Collection<T> — в пакете Nito.AsyncEx.

9.1. Неизменяемые стеки и очереди

Задача

Вам нужна коллекция — стек или очередь, которая изменяется не очень часто и к которой можно безопасно обращаться из нескольких потоков.

Например, очередь может использоваться для представления последовательности выполняемых операций, а стек — для представления последовательности операций отмены.

Решение

Простейшие неизменяемые коллекции — неизменяемые стеки и очереди. По своему поведению они очень близки к стандартным коллекциям Stack<T> и Queue<T>. В отношении быстродействия неизменяемые стеки и очереди обладают практически такой же временной сложностью, что и стандартные стеки и очереди; впрочем, в простых сценариях с частым обновлением коллекций стандартные стеки и очереди работают быстрее.

Стеки относятся к категории LIFO («Last In, First Out», т.е. «последним зашел, первым вышел»). Следующий пример создает пустой неизменяемый стек, заносит в него два элемента, перебирает элементы, после чего извлекает элемент из стека:

ImmutableStack<int> stack = ImmutableStack<int>.Empty;

stack = stack.Push(13);

stack = stack.Push(7);

 

// Выводит "7", затем "13".

foreach (int item in stack)

  Trace.WriteLine(item);

 

int lastItem;

stack = stack.Pop(out lastItem);

// lastItem == 7

Обратите внимание: в этом примере многократно перезаписывается локальная переменная stack. Неизменяемые коллекции строятся на основе паттерна, в соответствии с которым они возвращают обновленную коллекцию; ссылка на исходную коллекцию остается без изменений. Это озна­чает, что если имеется ссылка на конкретный экземпляр неизменяемой коллекции, она никогда не изменится. Рассмотрим следующий пример:

ImmutableStack<int> stack = ImmutableStack<int>.Empty;

stack = stack.Push(13);

ImmutableStack<int> biggerStack = stack.Push(7);

 

// Выводит "7", затем "13".

foreach (int item in biggerStack)

  Trace.WriteLine(item);

 

// Выводит только "13".

foreach (int item in stack)

  Trace.WriteLine(item);

Во внутренней реализации два стека совместно используют память, выделенную для хранения элемента 13. Такая реализация весьма эффективна, к тому же она позволяет легко создавать снимки текущего состояния. Каждый экземпляр неизменяемой коллекции потокобезопасен по своей природе, но неизменяемые коллекции также могут использоваться в однопоточных приложениях. По моему опыту, неизменяемые коллекции особенно удобны при использовании функционального кода, а также при необходимости хранить большое количество снимков коллекции, которые должны по возможности совместно использовать одну память.

Очереди похожи на стеки, но они относятся к категории структур FIFO («First In, First Out», т.е. «первым зашел, первым вышел»). Следующий пример создает пустую неизменяемую очередь, помещает в очередь два элемента, перебирает элементы, а затем извлекает элемент из очереди:

ImmutableQueue<int> queue = ImmutableQueue<int>.Empty;

queue = queue.Enqueue(13);

queue = queue.Enqueue(7);

 

// Выводит "13", затем "7".

foreach (int item in queue)

  Trace.WriteLine(item);

int nextItem;

queue = queue.Dequeue(out nextItem);

 

// Выводит "13".

Trace.WriteLine(nextItem);

Пояснение

В этом рецепте представлены две простейшие неизменяемые коллекции — стек и очередь. В нем также изложены некоторые важные принципы проектирования, справедливые для всех неизменяемых коллекций:

• Экземпляр неизменяемой коллекции никогда не изменяется.

• Так как экземпляр никогда не изменяется, он потокобезопасен по своей природе.

• При вызове изменяющего метода для неизменяемой коллекции возвращается новая измененная коллекция.

scorp.tiff

Неизменяемые коллекции являются потокобезопасными, но ссылки на них потокобезопасными не являются. Переменная, ссылающаяся на неизменяемую коллекцию, нуждается в такой же синхронизационной защите, как и любая другая переменная (см. главу 12).

Неизменяемые коллекции идеально подходят для хранения общего состояния. С другой стороны, в качестве коммуникационного канала они работают не так хорошо. В частности, неизменяемые очереди не следует использовать для передачи данных между потоками; очереди «производитель/потребитель» подходят для этой цели намного лучше.

lemur.tiff

ImmutableStack<T> и ImmutableQueue<T> находятся в пакете System.Collections.Immutable.

Дополнительная информация

В рецепте 9.6 рассматриваются потокобезопасные (блокирующие) изменяемые очереди.

В рецепте 9.7 рассматриваются потокобезопасные (блокирующие) изменяемые стеки.

В рецепте 9.8 рассматриваются async-совместимые изменяемые очереди.

В рецепте 9.11 рассматриваются async-совместимые изменяемые стеки.

В рецепте 9.12 рассматриваются блокирующие/асинхронные изменяемые очереди.

9.2. Неизменяемые списки

Задача

Нужна структура данных с возможностью индексирования, которая изменяется не слишком часто и допускает безопасные обращения из нескольких потоков.

Решение

Список — структура данных общего назначения, которая может использоваться для хранения разнообразных данных состояния приложения. Неизменяемые списки поддерживают индексирование, однако вы должны учитывать их характеристики быстродействия. Они не должны рассматриваться как тривиальная замена для List<T>.

ImmutableList<T> поддерживает примерно те же методы, что и List<T>, как показывают следующие примеры:

ImmutableList<int> list = ImmutableList<int>.Empty;

list = list.Insert(0, 13);

list = list.Insert(0, 7);

 

// Выводит "7", затем "13".

foreach (int item in list)

  Trace.WriteLine(item);

 

list = list.RemoveAt(1);

Во внутренней реализации неизменяемого списка используется двоичное дерево, чтобы экземпляры неизменяемого списка могли максимизировать объем памяти, используемый совместно с другими экземплярами. В результате для некоторых распространенных операций существуют различия в быстродействии между ImmutableList<T> и List<T> (табл. 9.2).

Таблица 9.2. Различия в быстродействии для неизменяемых списков

Операция

List<T>

ImmutableList<T>

Add

Амортизированная O(1)

O(log N)

Insert

O(N)

O(log N)

RemoveAt

O(N)

O(log N)

Item[индекс]

O(1)

O(log N)

Стоит отметить, что операция индексирования для ImmutableList<T> обладает сложностью O(log N), а не O(1), как можно было бы ожидать. Если вы заменяете List<T> на ImmutableList<T> в существующем коде, следует учесть, как ваши алгоритмы обращаются к элементам коллекции.

Это означает, что следует использовать foreach вместо for там, где это возможно. Цикл foreach по ImmutableList<T> выполняется за время O(N), тогда как цикл for  по той же коллекции выполняется за время O(N * log N):

// Лучший способ перебора ImmutableList<T>.

foreach (var item in list)

  Trace.WriteLine(item);

// Тоже будет работать, но намного медленнее.

 

for (int i = 0; i != list.Count; ++i)

  Trace.WriteLine(list[i]);

Пояснение

ImmutableList<T> — хорошая структура данных общего назначения, но из-за различий в быстродействии вы не сможете бездумно заменить ей все List<T>. List<T> часто используется по умолчанию — именно эту структуру данных следует использовать, если только у вас нет веских причин для выбора другой коллекции. Коллекция ImmutableList<T> не настолько распространена; следует тщательно проанализировать другие неизменяемые коллекции и выбрать ту, которая лучше всего подходит для вашей ситуации.

lemur.tiff

ImmutableList<T> находится в пакете System.Collections.Immutable.

Дополнительная информация

В рецепте 9.1 рассматриваются неизменяемые стеки и очереди — структуры данных, сходные со списками, но ограничивающие доступ некоторым элементам.

В документации ImmutableList<T>.Builder в MSDN () рассматривается эффективный способ заполнения неизменяемых списков.

9.3. Неизменяемые множества

Задача

Нужна структура данных, не рассчитанная на хранение дубликатов, которая не слишком часто изменяется и допускает безопасные обращения из нескольких потоков.

Например, индекс слов из файла может быть хорошим кандидатом для применения множества.

Решение

Существует два типа неизменяемых множеств: ImmutableHashSet<T> — коллекция уникальных элементов и ImmutableSortedSet<T> — отсортированная коллекция уникальных элементов. Типы обладают похожим интерфейсом:

ImmutableHashSet<int> hashSet = ImmutableHashSet<int>.Empty;

hashSet = hashSet.Add(13);

hashSet = hashSet.Add(7);

 

// Выводит "7" и "13" в непредсказуемом порядке.

foreach (int item in hashSet)

  Trace.WriteLine(item);

 

hashSet = hashSet.Remove(7);

Только отсортированное множество допускает индексирование по аналогии со списком:

ImmutableSortedSet<int> sortedSet = ImmutableSortedSet<int>.Empty;

sortedSet = sortedSet.Add(13);

sortedSet = sortedSet.Add(7);

 

// Выводит "7", затем "13".

foreach (int item in sortedSet)

  Trace.WriteLine(item);

int smallestItem = sortedSet[0];

// smallestItem == 7

sortedSet = sortedSet.Remove(7);

Несортированные и отсортированные множества обладают сходным быстродействием (табл. 9.3).

Рекомендую использовать несортированное множество, если только вы не уверены в том, что оно должно быть отсортированным. Многие типы поддерживают только базовое равенство, но не полное сравнение, так что несортированное множество может использоваться для большего количества типов, чем отсортированное множество.

Таблица 9.3. Быстродействие неизменяемых множеств

Операция

ImmutableHashSet<T>

ImmutableSortedSet<T>

Add

O(log N)

O(log N)

Remove

O(log N)

O(log N)

Item[индекс]

O(log N)

Одно важное примечание по поводу отсортированных множеств: индексирование для них выполняется за время O(log N), а не O(1), как у ImmutableList<T> (см. рецепт 9.2). Это означает, что в данной ситуации действует та же рекомендация: используйте foreach вместо for там, где это возможно, с ImmutableSortedSet<T>.

Пояснение

Неизменяемые множества полезны, но заполнение большого неизменяемого множества может быть медленной операцией. У многих неизменяемых коллекций имеются специальные построители, которые могут использоваться для быстрого их построения в изменяемом виде с последующим преобразованием в неизменяемую коллекцию. Это относится ко многим неизменяемым коллекциям, но, на мой взгляд, они особенно полезны для неизменяемых множеств.

lemur.tiff

ImmutableHashSet<T> и  ImmutableSortedSet<T> находятся в пакете System.Collections.Immutable.

Дополнительная информация

В рецепте 9.7 рассматриваются потокобезопасные изменяемые мульти­множества, сходные с множествами.

В рецепте 9.11 рассматриваются async-совместимые изменяемые мультимножества.

В документации ImmutableHashSet<T>.Builder в MSDN () рассматривается эффективный способ заполнения неизменяемых хешированных множеств.

В документации ImmutableSortedSet<T>.Builder в MSDN () рассматривается эффективный способ заполнения неизменяемых отсортированных множеств.

9.4. Неизменяемые словари

Задача

Нужна коллекция «ключ/значение», которая не слишком часто изменяется и допускает безопасные обращения из нескольких потоков. Например, в этой коллекции могут храниться данные ссылок в подстановочной таблице; данные ссылок редко изменяются, но они должны быть доступны для разных потоков.

Решение

Есть два типа неизменяемых множеств: ImmutableDictionary<TKey, TValue> и ImmutableSortedDictionary<TKey, TValue>.  Как нетрудно догадаться по именам, если элементы ImmutableDictionary следуют в непредсказуемом порядке, ImmutableSortedDictionary гарантирует, что его элементы следуют в порядке сортировки.

Эти типы коллекций имеют очень похожие составляющие:

ImmutableDictionary<int, string> dictionary =

    ImmutableDictionary<int, string>.Empty;

dictionary = dictionary.Add(10, "Ten");

dictionary = dictionary.Add(21, "Twenty-One");

dictionary = dictionary.SetItem(10, "Diez");

 

// Выводит "10Diez" и "21Twenty-One" в непредсказуемом порядке.

foreach (KeyValuePair<int, string> item in dictionary)

  Trace.WriteLine(item.Key + item.Value);

 

string ten = dictionary[10];

// ten == "Diez"

 

dictionary = dictionary.Remove(21);

Обратите внимание на использование SetItem. В изменяемом словаре можно было бы попытаться использовать конструкцию вида словарь[ключ] = элемент, но неизменяемые словари должны возвращать обновленный неизменяемый словарь, поэтому вместо этого они должны использовать метод SetItem:

ImmutableSortedDictionary<int, string> sortedDictionary =

    ImmutableSortedDictionary<int, string>.Empty;

sortedDictionary = sortedDictionary.Add(10, "Ten");

sortedDictionary = sortedDictionary.Add(21, "Twenty-One");

sortedDictionary = sortedDictionary.SetItem(10, "Diez");

 

// Выводит "10Diez", затем "21Twenty-One".

foreach (KeyValuePair<int, string> item in sortedDictionary)

  Trace.WriteLine(item.Key + item.Value);

 

string ten = sortedDictionary[10];

// ten == "Diez"

sortedDictionary = sortedDictionary.Remove(21);

Несортированные и отсортированные словари обладают сходным быстродействием, но я рекомендую использовать неупорядоченные словари, если только не требуется, чтобы элементы были отсортированы (табл. 9.4). Несортированные словари могут работать в целом немного быстрее. Кроме того, несортированные словари могут использоваться с любыми типами ключей, тогда как отсортированные словари требуют полной совместимости типов их ключей.

Таблица 9.4. Быстродействие неизменяемых словарей

Операция

ImmutableDictionary<TK,TV>

ImmutableSortedDictionary<TK,TV>

Add

O(log N)

O(log N)

SetItem

O(log N)

O(log N)

Item[key]

O(log N)

O(log N)

Remove

O(log N)

O(log N)

Пояснение

По опыту скажу, что словари являются полезным и общепринятым инструментом при работе с состоянием приложения. Они могут использоваться в любых сценариях, связанных с ключами/значениями или подстановками.

Неизменяемые словари, как и другие неизменяемые коллекции, поддерживают механизм для эффективного построения словарей, содержащих большое количество элементов. Например, если исходные ссылочные данные загружаются в начале работы программы, вы сможете воспользоваться механизмом построителей для конструирования исходного неизменяемого словаря. С другой стороны, если ссылочные данные строятся постепенно во время выполнения, вероятно, можно будет воспользоваться обычным методом Add неизменяемых словарей.

lemur.tiff

ImmutableDictionary<TK,TV> и ImmutableSortedDictionary<TK,TV> находятся в пакете System.Collections.Immutable.

Дополнительная информация

В рецепте 9.5 рассматриваются потокобезопасные изменяемые словари.

В документации ImmutableDictionary<TK,TV>.Builder в MSDN () рассматривается эффективный способ заполнения неизменяемых словарей.

В документации ImmutableSortedDictionary<TK,TV>.Builder в MSDN () рассматривается эффективный способ заполнения неизменяемых отсортированных словарей.

9.5. Потокобезопасные словари

Задача

Имеется коллекция «ключ/значение» (например, кэш в памяти), которая должна поддерживаться в синхронизированном состоянии, даже если несколько потоков выполняют с ней операции чтения и записи.

Решение

Тип ConcurrentDictionary<TKey,TValue> в фреймворке .NET — настоящее сокровище среди структур данных. Он является потокобезопасным и использует сочетание детализированных блокировок и приемов, не использующих блокировки, которое гарантирует быстрый доступ в подавляющем большинстве сценариев.

Вероятно, понадобится какое-то время, чтобы привыкнуть к этому API. Он сильно отличается от стандартного типа Dictionary<TKey,TValue>, поскольку должен иметь дело с конкурентным доступом из многих потоков. Но после того как вы ознакомитесь с основами из этого рецепта, поймете, что ConcurrentDictionary<TKey,TValue> — один из самых полезных типов коллекций.

Для начала посмотрим, как записать значение в коллекцию. Чтобы задать значение для ключа, используйте метод AddOrUpdate:

var dictionary = new ConcurrentDictionary<int, string>();

string newValue = dictionary.AddOrUpdate(0,

    key => "Zero",

    (key, oldValue) => "Zero");

Метод AddOrUpdate выглядит сложно, так как должен делать несколько вещей в зависимости от текущего содержимого конкурентного словаря. В первом аргументе метода передается ключ. Во втором аргументе передается делегат, преобразующий ключ (в данном случае 0) в значение, которое будет добавлено в словарь (в данном случае "Zero"). Этот делегат вызывается только в том случае, если ключ не существует в словаре. В третьем аргументе передается еще один делегат, преобразующий ключ (0) и старое значение в обновленное значение, которое должно быть сохранено в словаре ("Zero"). Этот делегат вызывается в том случае, если ключ уже существует в словаре. AddOrUpdate возвращает новое значение для этого ключа (то же значение, которое было возвращено одним из делегатов).

А теперь начинается то, от чего действительно голова может пойти кругом: чтобы конкурентный словарь работал правильно, может оказаться, что метод AddOrUpdate должен вызвать одного (или обоих) делегатов несколько раз. Такое бывает очень редко, но возможно. А значит, ваши делегаты должны быть простыми и быстрыми и не должны иметь побочных эффектов. Следовательно, делегаты должны только создавать значение, не изменяя никакие другие переменные в вашем приложении. Этот принцип распространяется на всех делегатов, передаваемых методом Concurrent­Dictionary<TKey,TValue>.

Есть несколько других способов добавления значений в словари. Один из упрощенных вариантов просто использует синтаксис индексирования:

// Используется тот же словарь "dictionary".

// Добавляет (или обновляет) ключ 0, связывая с ним значение "Zero".

dictionary[0] = "Zero";

Синтаксис индексирования обладает меньшими возможностями; он не предоставляет возможности обновления значений на основании существую­щего значения. Впрочем, этот синтаксис проще и он нормально работает, если вам уже известно значение, которое требуется сохранить в словаре.

Теперь посмотрим, как выполняется чтение значений. Это легко делается методом TryGetValue:

// Используется тот же словарь "dictionary".

bool keyExists = dictionary.TryGetValue(0, out string currentValue);

TryGetValue вернет true и задаст значение, если ключ был найден в словаре. Если ключ не найден, TryGetValue вернет false. Синтаксис индексирования также может использоваться для чтения значений, но я считаю, что в данной ситуации он уже не столь полезен, потому что при отсутствии ключа будет выдано исключение. Помните, что в конкурентном словаре несколько потоков могут заниматься чтением, обновлением, добавлением и удалением значений; во многих ситуациях бывает трудно проверить, существует ключ или нет, до того как вы попытаетесь прочитать его.

Удаление значений выполняется так же просто, как и их чтение:

// Используется тот же словарь "dictionary".

bool keyExisted = dictionary.TryRemove(0, out string removedValue);

Метод TryRemove почти идентичен TryGetValue, не считая того, что он удаляет пару «ключ/значение», если ключ был обнаружен в словаре.

Пояснение

Хотя тип ConcurrentDictionary<TKey,TValue> является потокобезопасным, это не означает атомарности его операций. Если несколько потоков вызывают AddOrUpdate конкурентно, может оказаться, что два потока обнаружат отсутствие ключа, а затем оба одновременно выполнят своего делегата, создающего новое значение.

Я считаю, что ConcurrentDictionary<TKey,TValue> — замечательный тип прежде всего из-за невероятно мощного метода AddOrUpdate. Тем не менее он подходит не для каждой ситуации. ConcurrentDictionary<TKey,TValue>  хорошо работает при чтении и записи со стороны нескольких потоков в общую коллекцию. Если обновления не выполняются постоянно (т. е. эта операция относительно редка), то, возможно, ImmutableDictionary<TKey, TValue> будет более подходящим кандидатом.

Тип ConcurrentDictionary<TKey,TValue> лучше подходит для ситуаций с общими данными, когда несколько потоков совместно используют одну коллекцию. Если некоторые потоки только добавляют элементы, а другие только удаляют их, возможно, вам лучше подойдет коллекция «производитель/потребитель».

ConcurrentDictionary<TKey,TValue> — не единственная потокобезопасная коллекция. BCL также предоставляет типы ConcurrentStack<T>,  ConcurrentQueue<T> и ConcurrentBag<T>.

Потокобезопасные коллекции часто используются в качестве коллекций «производитель/потребитель», которые будут рассмотрены далее в этой главе.

Дополнительная информация

В рецепте 9.4 рассматриваются неизменяемые словари, идеально подходящие для ситуаций, в которых содержимое словаря изменяется очень редко.

9.6. Блокирующие очереди

Задача

Необходимо создать коммуникационный канал для передачи сообщений или данных между потоками. Например, один поток может загружать данные, которые отправляются по каналу по мере загрузки; другие потоки на стороне получения получают эти данные и обрабатывают их.

Решение

Тип .NET BlockingCollection<T> проектировался для создания таких коммуникационных каналов. По умолчанию BlockingCollection<T> работает в режиме блокирующей очереди и предоставляет поведение «первым зашел, первым вышел».

Блокирующая очередь должна совместно использоваться несколькими потоками, и обычно определяется как приватное поле, доступное только для чтения:

private readonly BlockingCollection<int> _blockingQueue =

    new BlockingCollection<int>();

Обычно поток делает что-то одно: либо добавляет элементы в коллекцию, либо удаляет элементы. Потоки, добавляющие элементы, называются потоками-производителями, а потоки, удаляющие элементы, называются потоками-потребителями.

Потоки-производители могут добавлять элементы вызовами Add, а когда поток-производитель завершится (когда будут добавлены все элементы), он может завершить коллекцию вызовом CompleteAdding. Тем самым он уведомляет коллекцию о том, что элементы далее добавляться не будут, а коллекция может сообщить своим потребителям, что элементов больше не будет.

В следующем простом примере производитель добавляет два элемента, а потом помечает коллекцию как завершенную:

_blockingQueue.Add(7);

_blockingQueue.Add(13);

_blockingQueue.CompleteAdding();

Потоки-потребители обычно выполняются в цикле, ожидая следующего элемента и выполняя его последующую обработку. Если выделить код производителя в отдельный поток (например, вызовом Task.Run), то эти элементы можно будет потреблять следующим образом:

// Выводит "7", затем "13".

foreach (int item in _blockingQueue.GetConsumingEnumerable())

  Trace.WriteLine(item);

Если потребителей должно быть несколько, GetConsumingEnumerable может вызываться из нескольких потоков одновременно. Тем не менее каждый элемент передается только одному из этих потоков. При завершении коллекции завершается и перечисляемый объект.

Пояснение

Во всех приведенных примерах GetConsumingEnumerable используется для потоков-потребителей; это самая распространенная ситуация. Но существует и метод Take, который позволяет потребителю получить только один элемент (вместо потребления всех элементов в цикле).

При использовании таких коммуникационных каналов необходимо подумать о том, что произойдет, если производители работают быстрее потребителей. Если элементы производятся быстрее, чем потребляются, возможно, придется применить регулировку очереди.

Блокирующие очереди хорошо работают при наличии отдельного потока (например, из пула потоков), действующего как производитель или потребитель. Они не настолько хороши, если вы хотите обращаться к коммуникационному каналу асинхронно — например, если UI-поток должен действовать в режиме потребителя. Асинхронные очереди рассматриваются в рецепте 9.8.

lemur.tiff

Если вы вводите в свое приложение подобный коммуникационный канал, подумайте о переходе на библиотеку TPL Dataflow. Во многих случаях решение с использованием TPL Dataflow проще самостоятельного построения коммуникационных каналов и фоновых потоков.

Тип BufferBlock<T> из TPL Dataflow может работать как блокирующая очередь, к тому же TPL Dataflow позволяет построить конвейер или сеть для обработки. Впрочем, во многих простых случаях обычные блокирующие очереди (например, BlockingCollection<T>) станут более подходящим вариантом при проектировании.

Также можно воспользоваться типом AsyncProducerConsumerQueue<T> библиотеки AsyncEx, который может работать как блокирующая очередь.

Дополнительная информация

В рецепте 9.7 рассматриваются блокирующие стеки и мультимножества на случай, если вам потребуются сходные коммуникационные каналы без семантики «первым зашел, первым вышел».

В рецепте 9.8 рассматриваются очереди, имеющие асинхронный API вместо блокирующего.

В рецепте 9.12 рассматриваются очереди, имеющие как асинхронный, так и блокирующий API.

В рецепте 9.9 рассматриваются очереди с регулировкой количества элементов.

9.7. Блокирующие стеки и мультимножества

Задача

Требуется коммуникационный канал для передачи сообщений или данных из одного потока в другой, но вы не хотите, чтобы этот канал использовал семантику FIFO.

Решение

Тип .NET BlockingCollection<T> по умолчанию работает как блокирующая очередь, но он также может работать как любая другая коллекция «производитель/потребитель». По сути это обертка для потокобезопасной коллекции, реализующей IProducerConsumerCollection<T>.

Таким образом, вы можете создать BlockingCollection<T> с семантикой LIFO или семантикой неупорядоченного мультимножества:

BlockingCollection<int> _blockingStack = new BlockingCollection<int>(

    new ConcurrentStack<int>());

BlockingCollection<int> _blockingBag = new BlockingCollection<int>(

    new ConcurrentBag<int>());

Важно учитывать, что с упорядочением элементов связаны некоторые условия гонки. Если вы позволите тому же коду-производителю отработать ранее любой код-потребитель, а затем выполните код-потребитель после кода-производителя, порядок элементов будет в точности таким же, как у стека:

// Код-производитель

_blockingStack.Add(7);

_blockingStack.Add(13);

_blockingStack.CompleteAdding();

 

// Код-потребитель

// Выводит "13", затем "7".

foreach (int item in _blockingStack.GetConsumingEnumerable())

  Trace.WriteLine(item);

Если код-производитель и код-потребитель выполняются в разных потоках (как это обычно бывает), потребитель всегда получает следующим тот элемент, который был добавлен последним. Например, производитель добавляет 7, потребитель получает 7, затем производитель добавляет 13, потребитель получает 13. Потребитель не ожидает вызова CompleteAdding перед тем, как вернуть первый элемент.

Пояснение

Все, чтобы было сказано о регулировке применительно к блокирующим очередям, также применимо к блокирующим стекам или мультимножествам. Если ваши производители работают быстрее потребителей и вы хотите ограничить использование памяти блокирующим стеком/оче­редью, используйте регулировку так, как показано в рецепте 9.9.

В этом рецепте для кода-потребителя используется GetConsumingEnume­rable —  самый распространенный сценарий. Также существует метод Take, который позволяет потребителю получить только один элемент (вместо потребления всех элементов).

Если вы хотите обращаться к совместно используемым стекам или мультимножествам асинхронно (например, если UI-поток должен действовать в режиме потребителя), обращайтесь к рецепту 9.11.

Дополнительная информация

В рецепте 9.6 рассматриваются блокирующие очереди, которые используются намного чаще блокирующих стеков или мультимножеств.

В рецепте 9.11 рассматриваются асинхронные стеки и мультимножества.

9.8. Асинхронные очереди

Задача

Требуется коммуникационный канал для передачи сообщений или данных из одной части кода в другую по принципу FIFO без блокирования потоков.

Например, один фрагмент кода может загружать данные, которые отправляются по каналу по мере загрузки; при этом UI-поток получает данные и выводит их.

Решение

Требуется очередь с асинхронным API. В базовом фреймворке .NET такого типа нет, но NuGet предоставляет пару возможных решений.

Во-первых, вы можете использовать Channels. Channels — современная библиотека для асинхронных коллекций «производитель/потребитель», уделяющая особое внимание высокому быстродействию в крупномасштабных сценариях. Производители обычно записывают элементы в канал вызовом WriteAsync, а когда они завершат производство элементов, один из них вызывает Complete для уведомления канала о том, что в дальнейшем элементов больше не будет:

Channel<int> queue = Channel.CreateUnbounded<int>();

 

// Код-производитель

ChannelWriter<int> writer = queue.Writer;

await writer.WriteAsync(7);

await writer.WriteAsync(13);

writer.Complete();

 

// Код-потребитель

// Выводит "7", затем "13".

ChannelReader<int> reader = queue.Reader;

await foreach (int value in reader.ReadAllAsync())

  Trace.WriteLine(value);

Более простой код-потребитель использует асинхронные потоки; дополнительную информацию см. в главе 3. На момент написания книги асинхронные потоки были доступны только на самых новых платформах .NET; старые платформы могут использовать следующий паттерн:

// Код-потребитель (старые платформы).

// Выводит "7", затем "13".

ChannelReader<int> reader = queue.Reader;

while (await reader.WaitToReadAsync())

  while (reader.TryRead(out int value))

    Trace.WriteLine(value);

Обратите внимание на двойной цикл while в коде-потребителе для старых платформ, это нормально. Метод WaitToReadAsync будет асинхронно ожидать до того, как элемент станет доступным, или канал будет помечен как завершенный; при наличии элемента, доступного для чтения, возвращается true. Метод TryRead пытается прочитать элемент (немедленно и синхронно), возвращая true, если элемент был прочитан. Если TryRead вернет false, это может объясняться тем, что прямо сейчас доступного элемента нет, или же тем, что канал был помечен как завершенный, и элементов больше вообще не будет. Таким образом, когда TryRead возвращает false, происходит выход из внутреннего цикла while, а потребитель снова вызывает метод WaitToReadAsync, который вернет false, если канал был помечен как завершенный.

Другой вариант организации очереди «производитель/потребитель» — использование BufferBlock<T> из библиотеки TPL Dataflow. Тип Buffer­Block<T> имеет много общего с каналом. Следующий пример показывает, как объявить BufferBlock<T>, как выглядит код-производитель и как выглядит код-потребитель:

var _asyncQueue = new BufferBlock<int>();

 

// Код-производитель.

await _asyncQueue.SendAsync(7);

await _asyncQueue.SendAsync(13);

_asyncQueue.Complete();

 

// Код-потребитель.

// Выводит "7", затем "13".

while (await _asyncQueue.OutputAvailableAsync())

  Trace.WriteLine(await _asyncQueue.ReceiveAsync());

Код-потребитель использует метод OutputAvailableAsync, который на самом деле полезен только с одним потребителем. Если потребителей несколько, может случиться, что OutputAvailableAsync вернет true для нескольких потребителей, хотя элемент только один. Если очередь завершена, то ReceiveAsync выдаст исключение InvalidOperationException. Таким образом, с несколькими потребителями код будет выглядеть так:

while (true)

{

  int item;

  try

  {

    item = await _asyncQueue.ReceiveAsync();

  }

  catch (InvalidOperationException)

  {

    break;

  }

  Trace.WriteLine(item);

}

Также можно воспользоваться типом AsyncProducerConsumerQueue<T> из NuGet-библиотеки Nito.AsyncEx. Его API похож на API BufferBlock<T>, но не совпадает с ним полностью:

var _asyncQueue = new AsyncProducerConsumerQueue<int>();

 

// Код-производитель

await _asyncQueue.EnqueueAsync(7);

await _asyncQueue.EnqueueAsync(13);

_asyncQueue.CompleteAdding();

// Код-потребитель

// Выводит "7", затем "13".

while (await _asyncQueue.OutputAvailableAsync())

  Trace.WriteLine(await _asyncQueue.DequeueAsync());

В этом коде также используется метод OutputAvailableAsync, который обладает теми же недостатками, что и BufferBlock<T>. С несколькими потребителями код обычно выглядит примерно так:

while (true)

{

  int item;

  try

  {

    item = await _asyncQueue.DequeueAsync();

  }

  catch (InvalidOperationException)

  {

    break;

  }

  Trace.WriteLine(item);

}

Пояснение

Рекомендую использовать библиотеку Channels для асинхронных очередей «производитель/потребитель» там, где это возможно. Помимо регулировки поддерживаются несколько режимов выборки, а код тщательно оптимизирован. Однако, если логика вашего приложения может быть выражена в виде «конвейера», через который проходят данные, TPL Dataflow может быть более логичным кандидатом. Последний вариант — AsyncProducerConsumerQueue<T> — имеет смысл в том случае, если в вашем приложении уже используются другие типы из AsyncEx.

lemur.tiff

Библиотека Channels находится в пакете System.Threading.Channels, BufferBlock<T> — в пакете System.Threading.Tasks.Dataflow, а тип AsyncProducerConsumerQueue<T> — в пакете Nito.AsyncEx.

Дополнительная информация

В рецепте 9.6 рассматриваются очереди «производитель/потребитель» с блокирующей семантикой вместо асинхронной.

В рецепте 9.12 рассматриваются очереди «производитель/потребитель» как с блокирующей, так и с асинхронной семантикой.

В рецепте 9.7 рассматриваются асинхронные стеки и мультимножества, если вам нужен аналогичный коммуникационный канал с семантикой FIFO.

9.9. Регулировка очередей

Задача

Имеется очередь «производитель/потребитель», но производители могут работать быстрее потребителей, что может привести к неэффективному использованию памяти. Также вам хотелось бы сохранить все элементы в очереди, а следовательно, понадобится механизм регулировки производителей.

Решение

При использовании очередей «производитель/потребитель» необходимо учитывать, что произойдет, если производители работают быстрее потребителей (если только вы твердо не уверены в том, что потребители всегда работают быстрее). Если вы производите элементы быстрее, чем можете потреблять их, очередь придется отрегулировать. Для этого можно задать максимальное количество элементов. Когда очередь будет «заполнена», она применяет обратное давление к производителям, блокируя их до того, как в очереди не появится свободное место.

Регулировка может выполняться посредством создания ограниченного канала (вместо неограниченного). Так как каналы асинхронны, производители будут регулироваться асинхронно:

Channel<int> queue = Channel.CreateBounded<int>(1);

ChannelWriter<int> writer = queue.Writer;

 

// Эта запись завершается немедленно.

await writer.WriteAsync(7);

 

// Эта запись (асинхронно) ожидает удаления 7

// перед тем как ставить в очередь 13.

await writer.WriteAsync(13);

 

writer.Complete();

Тип BufferBlock<T> имеет встроенную поддержку регулировки, которая более подробно рассмотрена в рецепте 5.4. С блоками потоков данных следует задать параметр BoundedCapacity:

var queue = new BufferBlock<int>(

    new DataflowBlockOptions { BoundedCapacity = 1 });

 

// Эта отправка завершается немедленно.

await queue.SendAsync(7);

 

// Эта отправка (асинхронно) ожидает удаления 7

// перед тем как ставить в очередь 13.

await queue.SendAsync(13);

 

queue.Complete();

Производитель в этом фрагменте кода использует асинхронный API SendAsync; тот же подход работает и для синхронного API Post.

Тип AsyncProducerConsumerQueue<T> из AsyncEx содержит поддержку регулировки. Просто сконструируйте очередь с подходящим значением:

var queue = new AsyncProducerConsumerQueue<int>(maxCount: 1);

 

// Эта операция постановки в очередь завершается немедленно.

await queue.EnqueueAsync(7);

 

// Эта операция постановки в очередь (асинхронно) ожидает удаления 7

// перед тем как ставить в очередь 13.

await queue.EnqueueAsync(13);

 

queue.CompleteAdding();

Блокирующие очереди «производитель/потребитель» также поддерживают регулировку. Вы можете использовать тип BlockingCollection<T> для регулировки количества элементов, для чего при создании передается соответствующее значение:

var queue = new BlockingCollection<int>(boundedCapacity: 1);

 

// Это добавление завершается немедленно.

queue.Add(7);

 

// Это добавление ожидает удаления 7 перед тем, как добавлять 13.

queue.Add(13);

 

queue.CompleteAdding();

Пояснение

Регулировка необходима в том случае, если производители работают быстрее потребителей. Один из сценариев, которые необходимо рассмотреть: могут ли производители работать быстрее потребителей, если ваше приложение будет работать на другом оборудовании? Обычно некоторая регулировка потребуется для того, чтобы гарантировать нормальную работу на будущем оборудовании и/или облачных платформах, которые нередко более ограничены в ресурсах, чем машины разработчиков.

Регулировка создает обратное давление на производителей, замедляя их для того, чтобы потребители гарантированно могли обработать все элементы без создания излишних затрат памяти. Если обрабатывать каждый элемент не обязательно, можно использовать выборку вместо регулировки. Выборка из очередей «производитель/потребитель» рассматривается в рецепте 9.10.

lemur.tiff

Библиотека Channels находится в пакете System.Threading.Channels, тип BufferBlock<T> — в пакете System.Threading.Tasks.Dataflow, а тип AsyncProducerConsumerQueue<T> — в пакете Nito.AsyncEx.

Дополнительная информация

В рецепте 9.8 рассматривается базовое использование асинхронных очередей «производитель/потребитель».

В рецепте 9.6 рассматривается базовое использование синхронных очередей «производитель/потребитель».

В рецепте 9.10 рассматривается выборка в очередях «производитель/потребитель» как альтернатива регулировке.

9.10. Выборка в очередях

Задача

Есть очередь «производитель/потребитель», но производители могут работать быстрее потребителей, что может привести к неэффективному использованию памяти. Сохранять все элементы из очереди не обязательно; необходимо отфильтровать элементы очереди так, чтобы более медленные потребители могли ограничиться обработкой самых важных элементов.

Решение

Библиотека Channels предоставляет самые простые средства применения выборки к элементам ввода. Типичный пример — всегда брать последние n элементов с потерей самых старых элементов при заполнении очереди:

Channel<int> queue = Channel.CreateBounded<int>(

    new BoundedChannelOptions(1)

    {

      FullMode = BoundedChannelFullMode.DropOldest,

    });

ChannelWriter<int> writer = queue.Writer;

 

// Операция записи завершается немедленно.

await writer.WriteAsync(7);

 

// Операция записи тоже завершается немедленно.

// Элемент 7 теряется, если только он не был

// немедленно извлечен потребителем.

await writer.WriteAsync(13);

Это самый простой механизм контроля входных потоков и предотвращения «затопления» потребителей.

Есть и другие режимы BoundedChannelFullMode. Например, если вы хотите, чтобы самые старые элементы сохранялись, можно при заполнении канала терять новые элементы:

Channel<int> queue = Channel.CreateBounded<int>(

    new BoundedChannelOptions(1)

    {

      FullMode = BoundedChannelFullMode.DropWrite,

    });

ChannelWriter<int> writer = queue.Writer;

 

// Операция записи завершается немедленно.

await writer.WriteAsync(7);

 

// Операция записи тоже завершается немедленно.

// Элемент 13 теряется, если только элемент 7 не был

// немедленно извлечен потребителем.

await writer.WriteAsync(13);

Пояснение

Библиотека Channels отлично подходит для простой выборки. Во многих ситуациях чрезвычайно полезен режим BoundedChannelFullMode.DropOldest. Более сложная выборка должна выполняться самими потребителями.

Если выборка должна выполняться по времени (например, «только 10 элементов в секунду»), используйте System.Reactive. В System.Reactive предусмотрены естественные операторы для работы со временем.

lemur.tiff

Библиотека Channels находится в пакете System.Threading.Channels.

Дополнительная информация

В рецепте 9.9 рассматривается регулировка каналов, которая ограничивает количество элементов в канале посредством блокировки производителей вместо потери элементов.

В рецепте 9.8 рассматривается базовое использование каналов, включая код производителя и потребителя.

В рецепте 6.4 рассматриваются регулировка и выборка в библиотеке System.Reactive, которая поддерживает выборку по времени.

9.11. Асинхронные стеки и мультимножества

Задача

Требуется коммуникационный канал для передачи сообщений или данных из одной части кода в другую, но вы не хотите, чтобы этот канал использовал семантику FIFO.

Решение

Библиотека Nito.AsyncEx предоставляет тип AsyncCollection<T>, который по умолчанию работает как асинхронная очередь, но он также может работать как любая разновидность коллекций «производитель/потребитель». Обертка для IProducerConsumerCollection<T>AsyncCollection<T> — также является async-эквивалентом типа .NET BlockingCollection<T>, который рассматривается в рецепте 9.7.

Тип AsyncCollection<T> поддерживает семантику LIFO (стек) или неупорядоченности (мультимножество) в зависимости от того, какая коллекция передается его конструктору:

var _asyncStack = new AsyncCollection<int>(

    new ConcurrentStack<int>());

var _asyncBag = new AsyncCollection<int>(

    new ConcurrentBag<int>());

Обратите внимание на состояние гонки в отношении упорядочения элементов в стеке. Если все производители завершатся до того, как начнут работать потребители, то порядок элементов будет соответствовать обычному стеку:

// Код-производитель

await _asyncStack.AddAsync(7);

await _asyncStack.AddAsync(13);

_asyncStack.CompleteAdding();

 

// Код-потребитель

// Выводит "13", затем "7".

while (await _asyncStack.OutputAvailableAsync())

  Trace.WriteLine(await _asyncStack.TakeAsync());

Если производители и потребители выполняются конкурентно (как это обычно бывает), потребитель всегда получает элемент, который был добавлен последним. Это приводит к тому, что поведение коллекции в целом не всегда соответствует поведению стека. Конечно, у мультимножества упорядочение вообще отсутствует.

В AsyncCollection<T> предусмотрена поддержка регулировки, которая необходима в тех случаях, когда производители могут добавлять данные в коллекцию быстрее, чем потребители их извлекают. Просто сконструи­руйте коллекцию с нужным значением:

var _asyncStack = new AsyncCollection<int>(

    new ConcurrentStack<int>(), maxCount: 1);

Теперь тот же код-производитель будет асинхронно ожидать по мере необходимости:

// Это добавление завершается немедленно.

await _asyncStack.AddAsync(7);

 

// Это добавление (асинхронно) ожидает удаления 7

// перед тем как помещать в очередь 13.

await _asyncStack.AddAsync(13);

 

_asyncStack.CompleteAdding();

Код-потребитель использует тип OutputAvailableAsync, на который распространяются ограничения, описанные в рецепте 9.8. При наличии нескольких потребителей код-потребитель обычно выглядит примерно так:

while (true)

{

  int item;

  try

  {

    item = await _asyncStack.TakeAsync();

  }

  catch (InvalidOperationException)

  {

    break;

  }

  Trace.WriteLine(item);

}

Пояснение

AsyncCollection<T> представляет собой асинхронный эквивалент Blocking­Collection<T> с несколько отличающимся API.

lemur.tiff

Тип AsyncCollection<T> находится в пакете Nito.AsyncEx.

Дополнительная информация

В рецепте 9.8 рассматриваются асинхронные очереди, намного более распространенные, чем асинхронные стеки или мультимножества.

В рецепте 9.7 рассматриваются синхронные (блокирующие) стеки и мультимножества.

9.12. Блокирующие/асинхронные очереди

Задача

Требуется коммуникационный канал для передачи сообщений или данных из одной части кода в другую по принципу FIFO, но при этом необходима гибкость для того, чтобы сторона производителя или сторона потребителя могла рассматриваться как синхронная или асинхронная.

Например, фоновый поток может загружать данные и заносить их в коммуникационный канал, и вы хотите, чтобы фоновый поток синхронно блокировался при заполнении канала. В это время UI-поток получает данные из коммуникационного канала, и вы хотите, чтобы UI-поток асинхронно извлекал данные из канала, чтобы он продолжал реагировать на действия пользователя.

Решение

После знакомства с блокирующими очередями в рецепте 9.6 и асинхронными очередями в рецепте 9.8 мы изучим несколько типов очередей, поддерживающих как блокирующие, так и асинхронные API.

Начнем с типов BufferBlock<T> и ActionBlock<T> из NuGet-библиотеки TPL Dataflow. Тип BufferBlock<T> может легко использоваться как асинхронная очередь «производитель/потребитель» (за подробностями обращайтесь к рецепту 9.8):

var queue = new BufferBlock<int>();

 

// Код-производитель

await queue.SendAsync(7);

await queue.SendAsync(13);

queue.Complete();

 

// Для одного потребителя

while (await queue.OutputAvailableAsync())

  Trace.WriteLine(await queue.ReceiveAsync());

 

// Для нескольких потребителей

while (true)

{

  int item;

  try

  {

    item = await queue.ReceiveAsync();

  }

  catch (InvalidOperationException)

  {

    break;

  }

 

  Trace.WriteLine(item);

}

Как показано в следующем примере, BufferBlock<T> также поддерживает синхронный API для производителей и потребителей:

var queue = new BufferBlock<int>();

 

// Код-производитель

queue.Post(7);

queue.Post(13);

queue.Complete();

 

// Код-потребитель

while (true)

{

  int item;

  try

  {

    item = queue.Receive();

  }

  catch (InvalidOperationException)

  {

    break;

  }

 

  Trace.WriteLine(item);

}

Код-потребитель, использующий BufferBlock<T>, получается довольно неудобным, так как не соответствует стилю программирования потоков данных. Библиотека TPL Dataflow включает ряд блокировок, которые могут объединяться в цепочки для определения реактивной сети. В данном случае очередь «производитель/потребитель», завершающая конкретное действие, может определяться с помощью ActionBlock<T>:

// Код-потребитель передается конструктору очереди

ActionBlock<int> queue = new ActionBlock<int>(item =>

   Trace.WriteLine(item));

// Асинхронный код-производитель

await queue.SendAsync(7);

await queue.SendAsync(13);

 

// Синхронный код-производитель

queue.Post(7);

queue.Post(13);

queue.Complete();

Если библиотека TPL Dataflow недоступна на нужной вам платформе(-ах), в Nito.AsyncEx существует тип AsyncProducerConsumerQueue<T>, который также поддерживает как синхронные, так и асинхронные методы:

var queue = new AsyncProducerConsumerQueue<int>();

 

// Асинхронный код-производитель

await queue.EnqueueAsync(7);

await queue.EnqueueAsync(13);

 

// Синхронный код-производитель

queue.Enqueue(7);

queue.Enqueue(13);

 

queue.CompleteAdding();

 

// Для одного асинхронного потребителя

while (await queue.OutputAvailableAsync())

  Trace.WriteLine(await queue.DequeueAsync());

 

// Для нескольких асинхронных потребителей

while (true)

{

  int item;

  try

  {

    item = await queue.DequeueAsync();

  }

  catch (InvalidOperationException)

  {

    break;

  }

  Trace.WriteLine(item);

}

 

// Синхронный код-потребитель

foreach (int item in queue.GetConsumingEnumerable())

  Trace.WriteLine(item);

Пояснение

Рекомендую по возможности использовать BufferBlock<T> или Action­Block<T>, потому что библиотека TPL Dataflow была протестирована более тщательно, чем библиотека Nito.AsyncEx. Однако тип Async­Producer­ConsumerQueue<T> тоже может пригодиться, если приложение уже использует другие типы из библиотеки AsyncEx.

System.Threading.Channels также можно использовать синхронно, но только косвенно. Их естественный API является асинхронным, но поскольку эти коллекции относятся к числу потокобезопасных, вы можете заставить их работать синхронно, упаковав код производства или потребления в Task.Run с  последующим блокированием по задаче, возвращенной Task.Run:

Channel<int> queue = Channel.CreateBounded<int>(10);

 

// Код-производитель

ChannelWriter<int> writer = queue.Writer;

Task.Run(async () =>

{

  await writer.WriteAsync(7);

  await writer.WriteAsync(13);

  writer.Complete();

}).GetAwaiter().GetResult();

 

// Код-потребитель

ChannelReader<int> reader = queue.Reader;

Task.Run(async () =>

{

  while (await reader.WaitToReadAsync())

    while (reader.TryRead(out int value))

      Trace.WriteLine(value);

}).GetAwaiter().GetResult();

Блоки TPL Dataflow,  AsyncProducerConsumerQueue<T> и Channels поддерживают регулировку, для чего при конструировании необходимо задать соответствующие параметры. Регулировка необходима в ситуациях, в которых производители заносят элементы в коллекцию быстрее, чем потребители могут потреблять их, в результате чего приложение будет расходовать слишком много памяти.

lemur.tiff

Типы BufferBlock<T> и ActionBlock<T> находятся в паке­те System.Threading.Tasks.Dataflow. Тип AsyncProducer­Consumer­Queue<T> находится в пакете Nito.AsyncEx. Библиотека Channels находится в пакете System.Threading.Channels.

Дополнительная информация

В рецепте 9.6 рассматриваются блокирующие очереди «производитель/потребитель».

В рецепте 9.8 рассматриваются асинхронные очереди «производитель/потребитель».

В рецепте 5.4 рассматривается регулировка блоков потоков данных.

Назад: Глава 8. Взаимодействие
Дальше: Глава 10. Отмена