Книга: Конкурентность в C#. Асинхронное, параллельное и многопоточное программирование. 2-е межд. изд.
Назад: Глава 7. Тестирование
Дальше: Глава 9. Коллекции

Глава 8. Взаимодействие

Асинхронное, параллельное, реактивное программирование — каждое из них на своем месте, но как они работают в сочетании друг с другом?

В этой главе рассматриваются различные сценарии взаимодействий, которые научат вас объединять разные подходы. Вы узнаете, как они взаимно дополняют друг друга, вместо того чтобы конкурировать; на границах соприкосновения двух подходов почти нет каких-либо неровностей.

8.1. Асинхронные обертки для «Async»-методов с «Completed»-событиями

Задача

Существует старый асинхронный паттерн, в котором используются методы с именами вида ОперацияAsync и события с именами вида ОперацияCompleted. Требуется выполнить операцию с использованием старого асинхронного паттерна и использовать await с результатом.

lemur.tiff

Паттерн ОперацияAsync/ОперацияCompleted называется асинхронным паттерном на основе событий (EAP, Event-based Asynchronous Pattern). Они будут упакованы в метод, возвращающий Task и реализующий асинхронный паттерн на основе Task (Task-based Asynchronous Pattern).

Решение

С помощью типа TaskCompletionSource<TResult> можно создавать обертки для асинхронных операций. ТипTaskCompletionSource<TResult> управляет Task<TResult> и позволяет завершить задачу в нужный момент.

В следующем примере определяется метод расширения для WebClient, который загружает строку. Тип WebClient определяет методы Download­StringAsync и DownloadStringCompleted. С их помощью можно определить метод DownloadStringTaskAsync:

public static Task<string> DownloadStringTaskAsync(this WebClient client,

    Uri address)

{

  var tcs = new TaskCompletionSource<string>();

 

  // Обработка события завершит задачу и отменит свою регистрацию.

  DownloadStringCompletedEventHandler handler = null;

  handler = (_, e) =>

  {

    client.DownloadStringCompleted -= handler;

    if (e.Cancelled)

      tcs.TrySetCanceled();

    else if (e.Error != null)

      tcs.TrySetException(e.Error);

    else

      tcs.TrySetResult(e.Result);

  };

 

  // Зарегистрировать событие и *затем* начать операцию.

  client.DownloadStringCompleted += handler;

  client.DownloadStringAsync(address);

  return tcs.Task;

}

Пояснение

Этот конкретный пример не слишком полезен, потому что WebClient уже определяет DownloadStringTaskAsync, а вы можете использовать более удобную для async версию HttpClient. Тем не менее этот прием также может использоваться для взаимодействия со старым асинхронным кодом, который еще не был обновлен для использования Task.

lemur.tiff

В новом коде всегда следует использовать HttpClient. Используйте WebClient только в том случае, если вы работаете с унаследованным кодом.

В обычной ситуации TAP-метод для загрузки строк будет называться ОперацияAsync (например, DownloadStringAsync); тем не менее эта схема формирования имен в данном случае не работает, потому что EAP уже определяет метод с таким именем. В таком случае TAP-методу присваивается имя ОперацияTaskAsync (например, DownloadStringTaskAsync).

При создании оберток для EAP-методов существует вероятность того, что «стартовый» метод может выдать исключение; в предыдущем примере это может произойти в DownloadStringAsync. В этом случае необходимо решить, разрешить ли исключению распространяться или перехватить исключение и вызвать TrySetException. В большинстве случае исключения, выданные в этой точке, происходят от ошибок использования, поэтому неважно, какой из вариантов вы выберете. Если вы не уверены в том, являются ли исключения ошибками использования, рекомендую перехватить исключение и вызвать TrySetException.

Дополнительная информация

В рецепте 8.2 рассматриваются TAP-обертки для методов APM (Begin­Operation и EndOperation).

В рецепте 8.3 рассматриваются TAP-обертки для любых типов уведомлений.

8.2. Асинхронные обертки для методов «Begin/End»

Задача

В старом асинхронном паттерне используются пары методов с именами BeginОперация и EndОперация, а также объектом IAsyncResult, представляю­щим асинхронную операцию. Имеется операция, реализованная на базе старого асинхронного паттерна; требуется организовать ее потребление с ключевым словом await.

lemur.tiff

Паттерн «BeginОперация/EndОперация» называется асинхронной моделью программирования (APM, Asynchronous Programming Model). Он будет упакован в метод, возвращающий Task и реализующий асинхронный паттерн на основе Task (Task-based Asynchronous Pattern).

Решение

Лучший способ упаковки APM — использование одного из методов From­Async с типом TaskFactory. FromAsync использует TaskCompletionSource­<TResult>  во внутренней реализации, но при создании обертки для APM FromAsync намного проще использовать.

Этот пример определяет метод расширения для WebRequest, который отправляет запрос HTTP и получает ответ. Тип WebRequest определяет BeginGetResponse и EndGetResponse; вы можете определить метод GetResponseAsync в следующем виде:

public static Task<WebResponse> GetResponseAsync(this WebRequest client)

{

  return Task<WebResponse>.Factory.FromAsync(client.BeginGetResponse,

      client.EndGetResponse, null);

}

Пояснение

У FromAsync есть множество перегруженных версий, от которых голова идет кругом!

Как правило, лучше всего вызывать FromAsync так, как это сделано в нашем примере. Сначала передайте метод BeginОперация (не вызывая его), затем передайте метод EndОперация (тоже без вызова). Затем передайте все аргументы, которые получает BeginОперация, кроме последних аргументов AsyncCallback и object. Наконец, передайте null.

Не вызывайте метод BeginОперация перед вызовом FromAsync. Можно вызвать FromAsync с передачей объекта IAsyncOperation, полученного от BeginОперация, но при таком вызове FromAsync придется использовать менее эффективную реализацию.

Возможно, вас интересует, почему в рекомендованном паттерне в конце всегда передается null. Метод FromAsync появился вместе с типом Task в .NET 4.0, перед появлением async. В то время в асинхронных обратных вызовах было принято использовать объекты state, а тип Task поддерживает эту возможность с помощью своего метода AsyncState. В новом паттерне async объекты state уже не нужны, поэтому в параметре state можно всегда передавать null. В те дни параметр state использовался только для предотвращения создания экземпляра замыкания (closure) при оптимизации использования памяти.

Дополнительная информация

В рецепте 8.3 рассматривается написание TAP-оберток для любых типов уведомлений.

8.3. Асинхронные обертки для чего угодно

Задача

Есть нетипичная или нестандартная асинхронная операция или событие. Требуется обеспечить их потребление с ключевым словом await.

Решение

Тип TaskCompletionSource<T> может использоваться для построения объектов Task<T> в любых сценариях. С помощью TaskCompletionSource<T> можно завершить задачу тремя разными способами: с успешным результатом, с отказом или с отменой.

До появления async компания Microsoft рекомендовала использовать два других асинхронных паттерна: APM (рецепт 8.2) и EAP (рецепт 8.1). Однако и APM, и EAP были достаточно неудобными, а в некоторых ситуациях их было трудно реализовать. По этой причине появилась неофициальная схема, основанная на обратных вызовах, с методами следующего вида:

public interface IMyAsyncHttpService

{

  void DownloadString(Uri address, Action<string, Exception> callback);

}

Такие методы следуют соглашению, согласно которому DownloadString запускает (асинхронную) загрузку, а при завершении callback активизируется либо с результатом, либо с исключением. Обычно callback активизируется в фоновом потоке.

Нестандартные асинхронные методы (вроде приведенного в предыдущем примере) могут быть упакованы в TaskCompletionSource<T>, чтобы они естественным образом работали с await, как в следующем примере:

public static Task<string> DownloadStringAsync(

    this IMyAsyncHttpService , Uri address)

{

  var tcs = new TaskCompletionSource<string>();

  , (result, exception) =>

  {

    if (exception != null)

      tcs.TrySetException(exception);

    else

      tcs.TrySetResult(result);

  });

  return tcs.Task;

}

Пояснение

Тот же паттерн TaskCompletionSource<T> может использоваться для упаковки любых асинхронных методов, какими бы нестандартными они ни были. Сначала создайте экземпляр TaskCompletionSource<T>. Затем организуйте обратный вызов, чтобы TaskCompletionSource<T> завершал свою задачу соответствующим образом. Запустите асинхронную операцию и наконец верните объект Task<T>, связанный с этим TaskCompletionSource<T>.

В этом паттерне важно быть уверенным в том, чтобы объект TaskCompletion­Source<T> всегда завершался. Тщательно продумайте обработку ошибок и убедитесь в том, чтобы TaskCompletionSource<T> завершался соответствующим образом. В последнем примере исключения явно передаются обратному вызову, так что блок catch не понадобится; но некоторые нестандартные паттерны могут потребовать перехвата исключений в обратных вызовах и включения их в TaskCompletionSource<T>.

Дополнительная информация

В рецепте 8.1 рассматриваются TAP-обертки для методов EAP (Опера­цияAsync, ОперацияCompleted).

В рецепте 8.2 рассматриваются TAP-обертки для методов APM (BeginОпе­рация, EndОперация).

8.4. Асинхронные обертки для параллельного кода

Задача

Существуют параллельные вычисления (создающие нагрузку на процессор), которые вы хотите потреблять с помощью await. Обычно бывает нужно, чтобы UI-поток не блокировался в ожидании завершения параллельных вычислений.

Решение

Тип Parallel и Parallel LINQ используют пул потоков для выполнения параллельной обработки. При этом вызывающий поток также включается в число потоков параллельной обработки, так что при вызове параллельного метода из UI-потока пользовательский интерфейс перестанет реагировать на действия пользователя до завершения обработки.

Чтобы пользовательский интерфейс не блокировался, упакуйте параллельную обработку в Task.Run и примените await к результату:

await Task.Run(() => Parallel.ForEach(...));

Ключевой аспект этого рецепта заключается в том, что параллельный код включает вызывающий поток в свой пул потоков, используемых для параллельной обработки. Это относится как к Parallel LINQ, так и к классу Parallel.

Пояснение

Это простой рецепт, но его часто упускают из виду. Используя Task.Run, вы перемещаете всю параллельную обработку в пул потоков. Task.Run возвращает объект Task, представляющий параллельную работу, а UI-поток может (асинхронно) ожидать его завершения.

Этот рецепт относится только к UI-коду. На стороне сервера (например, ASP.NET) параллельная обработка выполняется редко, потому что параллелизм уже обеспечивается серверным хостом. По этой причине код на стороне сервера не должен ни выполнять параллельную обработку, ни передавать ее пулу потоков.

Дополнительная информация

В главе 4 рассматриваются основные принципы параллельного кода.

В главе 2 рассматриваются основные принципы асинхронного кода.

8.5. Асинхронные обертки для наблюдаемых объектов System.Reactive

Задача

Имеется наблюдаемый поток, который требуется потреблять с использованием await.

Решение

Сначала необходимо решить, какие события наблюдаемого объекта в потоке событий вас интересуют. Самые распространенные ситуации:

• Последнее событие перед завершением потока.

• Следующее событие.

• Все события.

Чтобы получить последнее событие в потоке, можно применить await либо к результату LastAsync, либо к наблюдаемому объекту напрямую:

IObservable<int> observable = ...;

int lastElement = await observable.LastAsync();

// или:  int lastElement = await observable;

Когда вы используете await с наблюдаемым объектом или LastAsync, код (асинхронно) ожидает завершения потока, после чего возвращает последний элемент. Во внутренней реализации await подписывается на поток.

Чтобы получить следующее событие в потоке, используйте FirstAsync. В следующем коде await подписывается на поток, после чего завершается (и отменяет подписку) сразу же с поступлением первого события:

IObservable<int> observable = ...;

int nextElement = await observable.FirstAsync();

Для отслеживания всех событий в потоке можно воспользоваться ToList:

IObservable<int> observable = ...;

IList<int> allElements = await observable.ToList();

Пояснение

Библиотека System.Reactive предоставляет все инструменты, необходимые для потребления потоков с использованием await. Единственный нюанс заключается в том, что придется думать о том, будет ли объект, допускаю­щий ожидание, ожидать до завершения потока. Из примеров данного рецепта LastAsync, ToList и собственно await будут ожидать завершения потока; FirstAsync будет ожидать только следующего события.

Если этих примеров вам недостаточно, вспомните, что в вашем распоряжении вся мощь LINQ, а также манипуляторы System.Reactive. Такие операции, как Take и Buffer, помогут в асинхронном ожидании необходимых элементов без ожидания завершения всего потока.

Некоторые операторы для использования с await — такие, как FirstAsync и LastAsync, — не возвращают Task<T>. Если вы планируете использовать Task.WhenAll или Task.WhenAny, то вам понадобится объект Task<T>, который можно получить вызовом ToTask для любого наблюдаемого объекта. ToTask вернет объект Task<T>, который завершается с последним значением в потоке.

Дополнительная информация

В разделе 8.6 рассматривается использование асинхронного кода с наблюдаемым потоком.

В разделе 8.8 рассматривается использование наблюдаемых потоков в качестве входных данных для блока потока данных (который может выполнять асинхронную работу).

В разделе 6.3 рассматривается использование окон и буферизации для наблюдаемых потоков.

8.6. Наблюдаемые обертки для асинхронного кода в System.Reactive

Задача

Имеется асинхронная операция, которую требуется объединить с функциональностью наблюдаемых объектов.

Решение

Любая асинхронная операция может интерпретироваться как наблюдаемый поток, который делает одно из двух:

• производит один элемент, после чего завершается;

• выдает отказ без генерирования каких-либо элементов.

Для реализации этой трансформации в библиотеке System.Reactive существует простое преобразование Task<T> в IObservable<T>. Следующий код запускает асинхронную загрузку веб-страниц, интерпретируя ее как наблюдаемую последовательность:

IObservable<HttpResponseMessage> GetPage(HttpClient client)

{

  Task<HttpResponseMessage> task =

      client.GetAsync("/");

  return task.ToObservable();

}

Решение с ToObservable предполагает, что вы уже вызвали async-метод и у вас имеется объект Task для преобразования.

Другой подход основан на вызове StartAsync. StartAsync также вызывает async-метод немедленно, но с поддержкой отмены: если подписка будет отменена, то async-метод отменяется:

IObservable<HttpResponseMessage> GetPage(HttpClient client)

{

  return Observable.StartAsync(

      token => client.GetAsync("/", token));

}

Как ToObservable, так  и StartAsync немедленно запускают асинхронную операцию без ожидания подписки; наблюдаемый объект является «горячим». Чтобы создать «холодный» наблюдаемый объект, который запускает операцию только при подписке, используйте метод FromAsync (который так же поддерживает отмену, как и StartAsync):

IObservable<HttpResponseMessage> GetPage(HttpClient client)

{

  return Observable.FromAsync(

      token => client.GetAsync("/", token));

}

FromAsync существенно отличается от ToObservable и StartAsync, которые возвращают наблюдаемый объект для уже запущенной async-операции. FromAsync запускает новую независимую async-операцию каждый раз, когда создается подписка.

Наконец, можно использовать специальные перегруженные версии Select­Many для запуска асинхронных операций для каждого события в исходном потоке по мере их поступления. SelectMany также поддерживает отмену.

Следующий пример получает существующий поток событий c URL, после чего инициирует запрос при получении каждого URL:

IObservable<HttpResponseMessage> GetPages(

    IObservable<string> urls, HttpClient client)

{

  return urls.SelectMany(

      (url, token) => client.GetAsync(url, token));

}

Пояснение

Библиотека System.Reactive существовала до появления async, но эти (и другие) операторы были добавлены для нормального взаимодействия с кодом async. Рекомендую использовать описанные в этом рецепте операторы даже в том случае, если ту же функциональность можно построить с использованием других операторов System.Reactive.

Дополнительная информация

В рецепте 8.5 рассматривается потребление наблюдаемых потоков в асинхронном коде.

В рецепте 8.8 рассматривается использование блоков потоков данных (которые могут содержать асинхронный код) как источников наблюдаемых потоков.

8.7. Асинхронные потоки и сети потоков данных

Задача

В одной части вашего решения используются асинхронные потоки, а в другой — сети потоков данных. Требуется организовать передачу данных между ними.

Решение

Потребление Channel как асинхронного потока встроено непосредственно в тип Channel; подробности см. в рецепте 9.8. Потребление блока TPL Dataflow как асинхронного потока происходит немного сложнее, но безусловно реально. На мой взгляд, проще сначала определить метод расширения для блоков потоков данных, которые делают их API более сходным с Channel, а затем использовать их как асинхронный поток с помощью этого метода расширения:

public static class DataflowExtensions

{

  public static bool TryReceiveItem<T>(this ISourceBlock<T> block,

     out T value)

  {

    if (block is IReceivableSourceBlock<T> receivableSourceBlock)

      return receivableSourceBlock.TryReceive(out value);

 

    try

    {

      value = block.Receive(TimeSpan.Zero);

      return true;

    }

    catch (TimeoutException)

    {

      // На данный момент доступного элемента нет.

      value = default;

      return false;

    }

    catch (InvalidOperationException)

    {

      // Блок завершен, элементов больше нет.

      value = default;

      return false;

    }

  }

 

  public static async IAsyncEnumerable<T> ReceiveAllAsync<T>(

      this ISourceBlock<T> block,

      [EnumeratorCancellation] CancellationToken cancellationToken =

         default)

  {

    while (await block

        .OutputAvailableAsync(cancellationToken).ConfigureAwait(false))

    {

      while (block.TryReceiveItem(out var value))

      {

        yield return value;

      }

    }

  }

}

Подробную информацию об атрибуте EnumeratorCancellation см. в рецепте 3.4.

С методом расширения из приведенного примера любой выходной блок потока данных может использоваться как асинхронный поток:

var multiplyBlock = new TransformBlock<int, int>(value => value * 2);

 

multiplyBlock.Post(5);

multiplyBlock.Post(2);

multiplyBlock.Complete();

 

await foreach (int item in multiplyBlock.ReceiveAllAsync())

{

  Console.WriteLine(item);

}

Также возможно использовать асинхронный поток в качестве источника элементов для блока потока данных. Все, что для этого понадобится, — это цикл, который извлекает элементы и помещает их в блок. В приведенном ниже коде действует пара допущений, которые могут быть неприемлемы в некоторых сценариях. Во-первых, предполагается, что блок должен завершаться при завершении потока. Во-вторых, он начинает выполняться в своем вызывающем потоке; в некоторых сценариях бывает нужно, чтобы весь цикл выполнялся в потоке из пула потоков:

public static async Task WriteToBlockAsync<T>(

    this IAsyncEnumerable<T> enumerable,

    ITargetBlock<T> block, CancellationToken token = default)

{

  try

  {

    await foreach (var item in enumerable

        .WithCancellation(token).ConfigureAwait(false))

    {

      await block.SendAsync(item, token).ConfigureAwait(false);

    }

 

    block.Complete();

  }

  catch (Exception ex)

  {

    block.Fault(ex);

  }

}

Пояснение

Предполагается, что методы расширения в этом рецепте станут отправной точкой для дальнейшей работы. В частности, метод расширения WriteToBlockAsync основан на некоторых допущениях; прежде чем пользоваться этими методами, обязательно продумайте их поведение и убедитесь в том, что оно соответствует вашей ситуации.

Дополнительная информация

В рецепте 9.8 рассматривается потребление Channel как асинхронного потока.

В рецепте 3.4 рассматривается отмена асинхронных потоков.

В главе 5 представлены рецепты для TPL Dataflow.

В главе 3 представлены рецепты для асинхронных потоков.

8.8. Наблюдаемые объекты System.Reactive Observables и сети потока данных

Задача

В одной части решения используются наблюдаемые объекты System.Reactive, в другой — сети потоков данных. Требуется организовать их взаимодействие.

И у наблюдаемых объектов System.Reactive, и у сетей потоков данных существуют свои области применения, которые перекрываются на концептуальном уровне; этот рецепт показывает, как легко организовать их совместную работу, чтобы вы могли выбрать наилучший инструмент для каждой части работы.

Решение

Для начала рассмотрим использование блока потока данных как входа для наблюдаемого потока. Следующий пример создает буферный блок (который не выполняет никакой обработки), а также наблюдаемый интерфейс на базе этого блока вызовом AsObservable:

var buffer = new BufferBlock<int>();

IObservable<int> integers = buffer.AsObservable();

integers.Subscribe(data => Trace.WriteLine(data),

    ex => Trace.WriteLine(ex),

    () => Trace.WriteLine("Done"));

buffer.Post(13);

Как буферные блоки, так и наблюдаемые потоки могут завершаться нормально или с ошибкой, и метод AsObservable преобразует завершение блока (или отказ) в завершение наблюдаемого потока. Однако если в блоке происходит отказ с выдачей исключения, при передаче наблюдаемому потоку это исключение будет упаковано в объект AggregateException. Происходящее напоминает механизм распространения отказов связанными блоками.

Ненамного сложнее взять сеть и работать с ней как с приемником для наблюдаемого потока. Следующий код вызывает AsObserver, чтобы блок мог подписаться на наблюдаемый поток:

IObservable<DateTimeOffset> ticks =

    Observable.Interval(TimeSpan.FromSeconds(1))

        .Timestamp()

        .Select(x => x.Timestamp)

        .Take(5);

var display = new ActionBlock<DateTimeOffset>(x => Trace.WriteLine(x));

ticks.Subscribe(display.AsObserver());

try

{

  display.Completion.Wait();

  Trace.WriteLine("Done.");

}

catch (Exception ex)

{

  Trace.WriteLine(ex);

}

Как и прежде, завершение наблюдаемого потока преобразуется в завершение блока, а любые ошибки из наблюдаемого потока преобразуются в отказ блока.

Пояснение

У блоков потоков данных и наблюдаемых потоков на концептуальном уровне есть много общего: через них проходят данные, в обоих случаях поддерживаются завершение и отказы. Они проектировались для разных сценариев; библиотека TPL Dataflow предназначалась для комбинаций асинхронного и параллельного программирования, а System.Reactive — для реактивного программирования. Впрочем, это концептуальное перекрытие совместимо в достаточной мере, чтобы они очень хорошо и естественно работали друг с другом.

Дополнительная информация

В рецепте 8.5 рассматривается потребление наблюдаемых потоков в асинхронном коде.

В рецепте 8.6 рассматривается использование асинхронного кода в наблюдаемых потоках.

8.9. Преобразование наблюдаемых объектов System.Reactive в асинхронные потоки

Задача

В отдельной части вашего решения используются наблюдаемые объекты System.Reactive. Требуется потреблять их как асинхронные потоки.

Решение

Наблюдаемые объекты System.Reactive работают по принципу проталкивания, а асинхронные потоки — по принципу вытягивания. А значит, необходимо прежде всего понять, что существует концептуальное несоответствие. Требуется обеспечить отклик для наблюдаемого потока и сохранить его уведомления до того, как они будут запрошены потреб­ляющим кодом.

Самое прямолинейное решение уже включено в библиотеку System.Linq.Async:

IObservable<long> observable =

    Observable.Interval(TimeSpan.FromSeconds(1));

 

// ПРЕДУПРЕЖДЕНИЕ: может потреблять неограниченную память; см. обсуждение!

IAsyncEnumerable<long> enumerable =

    observable.ToAsyncEnumerable();

lemur.tiff

Метод расширения ToAsyncEnumerable находится в NuGet-пакете System.Linq.Async.

Важно понять, что этот простой метод расширения ToAsyncEnumerable  во внутренней реализации использует неограниченную очередь «производитель/потребитель». По сути он не отличается от метода расширения, который вы можете написать самостоятельно с использованием Channel как неограниченной очереди «производитель/потребитель»:

// ПРЕДУПРЕЖДЕНИЕ: может потреблять неограниченную память; см. обсуждение!

public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(

    this IObservable<T> observable)

{

  Channel<T> buffer = Channel.CreateUnbounded<T>();

  using (observable.Subscribe(

      value => buffer.Writer.TryWrite(value),

      error => buffer.Writer.Complete(error),

      () => buffer.Writer.Complete()))

  {

    await foreach (T item in buffer.Reader.ReadAllAsync())

      yield return item;

  }

}

Эти решения просты, но используют неограниченные очереди, поэтому пользоваться ими следует только в том случае, если вы уверены, что потребитель сможет (со временем) не отстать от наблюдаемых событий. Если потребитель в течение какого-то времени отстает от производителя, ничего страшного; в это время наблюдаемые события сохраняются в буфере. Если потребитель со временем догонит его, приведенные решения будут работать нормально. Но если производитель всегда работает быстрее потребителя, наблюдаемые события продолжат поступать, буфер будет расширяться и со временем займет всю память процесса.

Чтобы избежать проблем с памятью, можно воспользоваться ограниченной очередью. Правда, вам придется решить, что делать с лишними элементами, если наблюдаемые события переполнят очередь. Один из возможных вариантов — отбросить лишние элементы; в следующем примере ограниченный канал используется для уничтожения самого старого наблюдаемого уведомления при переполнении буфера:

// ПРЕДУПРЕЖДЕНИЕ: возможна потеря элементов; см. обсуждение!

public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(

    this IObservable<T> observable, int bufferSize)

{

  var bufferOptions = new BoundedChannelOptions(bufferSize)

  {

    FullMode = BoundedChannelFullMode.DropOldest,

  };

  Channel<T> buffer = Channel.CreateBounded<T>(bufferOptions);

  using (observable.Subscribe(

      value => buffer.Writer.TryWrite(value),

      error => buffer.Writer.Complete(error),

      () => buffer.Writer.Complete()))

  {

    await foreach (T item in buffer.Reader.ReadAllAsync())

      yield return item;

  }

}

Пояснение

Если ваш производитель работает быстрее, чем потребитель, придется либо буферизовать элементы производителя (в предположении, что потребитель со временем его догонит), либо ограничить их количество. Второе решение в этом рецепте ограничивает элементы производителя за счет потери элементов, не помещающихся в буфере. Также можно ограничить элементы производителя с помощью операторов наблюдаемых объектов, предназначенных для этой цели, — таких, как Throttle или Sample; за подробностями обращайтесь к рецепту 6.4. В зависимости от ваших потребностей может быть лучше применить Throttle или Sample к входному наблюдаемому объекту перед его преобразованием в IAsyncEnumerable<T> одним из способов, описанных в этом рецепте.

Кроме ограниченных и неограниченных очередей существует третий вариант, который здесь не описан: использование обратного давления (backpressure) для уведомления наблюдаемого потока о том, что он должен перестать производить уведомления, пока буфер не будет готов принять их. К сожалению, в System.Reactive паттерн обратного давления еще не стандартизирован, поэтому на момент написания книги этот вариант еще не был работоспособным. Обратное давление — сложный и неоднозначный паттерн, а в реактивных библиотеках для других языков были реализованы разные паттерны для обратного давления. Пока неясно, примет ли System.Reactive один из этих паттернов, изобретет ли собственный паттерн обратного давления или просто оставит проблему обратного давления неразрешенной.

Дополнительная информация

В рецепте 6.4 рассматриваются операторы System.Reactive, предназначенные для регулировки ввода.

В рецепте 9.8 рассматривается использование Channel как неограниченной очереди «производитель/потребитель».

В рецепте 9.10 рассматривается использование Channel как очереди выборки с потерей элементов при переполнении.

Назад: Глава 7. Тестирование
Дальше: Глава 9. Коллекции