Книга: Конкурентность в C#. Асинхронное, параллельное и многопоточное программирование. 2-е межд. изд.
Назад: Глава 5. Основы Dataflow
Дальше: Глава 7. Тестирование

Глава 6. Основы System.Reactive

LINQ — набор языковых средств, которые могут использоваться разработчиками для выдачи запросов к последовательностям. Два самых популярных провайдера LINQ — LINQ to Objects (на базе IEnumerable<T>) и LINQ to Entities (на базе IQueryable<T>). Есть множество других провайдеров, имеющих сходную общую структуру. Запросы обрабатываются в отложенном режиме (lazily), а последовательности генерируют значения по мере необходимости. На концептуальном уровне используется модель с вытягиванием; при обработке элементы-значения извлекаются из очереди по одному.

System.Reactive (Rx) интерпретирует события как последовательности данных, поступающих с течением времени. Соответственно Rx можно рассматривать как LINQ to Events (на базе IObservable<T>). Главное различие между наблюдаемыми объектами и другими провайдерами LINQ заключается в том, что Rx использует модель проталкивания, т.е. запрос определяет, как программа реагирует при поступлении событий. Rx строится на базе LINQ и добавляет новые мощные операторы как методы расширения.

В этой главе рассматриваются более типичные операции Rx. Помните, что все операторы LINQ тоже доступны, так что простые операции — фильтрация (Where), проекция (Select) и т.д. — на концептуальном уровне работают так же, как и с любым другим провайдером LINQ. Эти распространенные операции LINQ здесь не рассматриваются; мы сосредоточимся на новых возможностях, которые Rx добавляет к LINQ, особенно предназначенным для работы со временем.

Чтобы использовать System.Reactive, установите NuGet-пакет для System.Reactive в своем приложении.

6.1. Преобразование событий .NET

Задача

Имеется событие, которое должно интерпретироваться как входной поток System.Reactive, генерирующий данные через OnNext при каждом инициировании события.

Решение

Класс Observable определяет несколько преобразователей событий. Большинство событий фреймворка .NET совместимо с FromEventPattern, но, если ваши события не соответствуют общей схеме, используйте FromEvent.

FromEventPattern лучше всего работает с типом делегата события Event­Handler<T>. Этот тип делегата события используется во многих более новых фреймворках. Например, тип Progress<T> определяет событие ProgressChanged с типом EventHandler<T>, что позволяет легко упаковать его в FromEventPattern:

var progress = new Progress<int>();

IObservable<EventPattern<int>> progressReports =

    Observable.FromEventPattern<int>(

        handler => progress.ProgressChanged += handler,

        handler => progress.ProgressChanged -= handler);

progressReports.Subscribe(data => Trace.WriteLine("OnNext:

   " + data.EventArgs));

Отмечу, что data.EventArgs сильно типизован с типом int. Аргумент-тип FromEventPattern (int в приведенном примере) совпадает с типом T в EventHandler<T>. Два лямбда-аргумента FromEventPattern позволяют System.Reactive подписываться и отменять подписку на событие.

Более новые фреймворки пользовательского интерфейса используют EventHandler<T>, что позволяет легко использовать их из FromEventPattern, но более старые типы часто определяют уникальный тип делегата для каждого события. Они также могут использоваться с FromEventPattern, но это потребует несколько большей работы. Например, тип System.Timers.Timer определяет событие Elapsed, относящееся к типу ElapsedEventHandler. Подобные старые события можно упаковать в FromEventPattern:

var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };

IObservable<EventPattern<ElapsedEventArgs>> ticks =

    Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>(

        handler => (s, a) => handler(s, a),

        handler => timer.Elapsed += handler,

        handler => timer.Elapsed -= handler);

ticks.Subscribe(data => Trace.WriteLine("OnNext:

   " + data.EventArgs.SignalTime));

Обратите внимание: в этом примере data.EventArgs также имеет сильную типизацию. Аргументы-типы FromEventPattern теперь содержат уникальный тип обработчика и производный тип EventArgs. Первый лямбда-аргумент FromEventPattern содержит преобразователь Event­Handler<ElapsedEventArgs> в ElapsedEventHandler; преобразователь не делает ничего, кроме простой передачи события.

Синтаксис определенно становится неудобным. Ниже приведен другой вариант, использующий отражение (reflection):

var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };

IObservable<EventPattern<object>> ticks =

    Observable.FromEventPattern(timer, nameof(Timer.Elapsed));

ticks.Subscribe(data => Trace.WriteLine("OnNext: "

    + ((ElapsedEventArgs)data.EventArgs).SignalTime));

При таком подходе вызов FromEventPattern выглядит намного проще. При этом у него есть один недостаток: потребитель не получает данные с сильной типизацией. Так как data.EventArgs относится к типу object, вам придется преобразовать его в ElapsedEventArgs самостоятельно.

Пояснение

События — распространенный источник данных для потоков System.Reactive. В этом рецепте рассматривается упаковка любых событий, соответствующих стандартной схеме события (в первом аргументе содержится отправитель, во втором — тип аргументов события). Даже если вы используете необычные типы событий, вы можете использовать перегруженные версии метода Observable.FromEvent, чтобы упаковать их в наблюдаемый объект.

Когда события упаковываются в наблюдаемый объект, OnNext вызывается при каждом инициировании события. Когда вы имеете дело с AsyncCompletedEventArgs, это может привести к неожиданному поведению, потому что любое исключение передается как данные (OnNext), а не как ошибка (OnError). Например, рассмотрим следующую обертку для WebClient.DownloadStringCompleted:

var client = new WebClient();

IObservable<EventPattern<object>> downloadedStrings =

    Observable.

    FromEventPattern(client, nameof(WebClient.DownloadStringCompleted));

downloadedStrings.Subscribe(

    data =>

    {

      var eventArgs = (DownloadStringCompletedEventArgs)data.EventArgs;

      if (eventArgs.Error != null)

        Trace.WriteLine("OnNext: (Error) " + eventArgs.Error);

      else

        Trace.WriteLine("OnNext: " + eventArgs.Result);

    },

    ex => Trace.WriteLine("OnError: " + ex.ToString()),

    () => Trace.WriteLine("OnCompleted"));

client.DownloadStringAsync(new Uri("/"));

Когда WebClient.DownloadStringAsync завершается с ошибкой, иници­ируется событие с исключением в AsyncCompletedEventArgs.Error. К сожалению, System.Reactive воспринимает его как событие данных, так что при выполнении приведенного кода будет выведено сообщение OnNext: (Error) вместо OnError:.

Некоторые подписки и отмены подписки на события должны выполняться из определенного контекста. Например, подписка на события многих UI-элементов должна выполняться из UI-потока. System.Reactive предоставляет оператор для управления контекстом создания и отмены подписки: SubscribeOn. В большинстве случаев без оператора SubscribeOn можно обойтись, потому что обычно подписки, относящиеся к UI, со­здаются из UI-потока.

lemur.tiff

SubscribeOn управляет контекстом кода, в котором добавляются и удаляются обработчики событий. Не путайте с оператором ObserveOn, управляющим контекстом для уведомлений наблюдаемого объекта (делегатов, передаваемых Subscribe).

Дополнительная информация

В рецепте 6.2 рассматривается изменение контекста, в котором иниции­руются события.

В рецепте 6.4 рассматривается регулировка событий для предотвращения перегрузки.

6.2. Отправка уведомлений контексту

Задача

System.Reactive старается действовать по возможности потоково-нейтрально. Таким образом, уведомления (например, OnNext) будут выдаваться в том потоке, который окажется текущим. Все уведомления OnNext происходят последовательно, но небязательно в одном потоке.

Часто бывает нужно, чтобы эти уведомления выдавались в конкретный контекст. Например, все манипуляции с UI-элементами должны осуществляться в UI-потоке, которому принадлежат эти элементы, поэтому если вы обновляете пользовательский интерфейс в ответ на уведомление, поступившее в поток из пула потоков, необходимо перейти к UI-потоку.

Решение

System.Reactive предоставляет оператор ObserveOn для перемещения уведомлений к другому планировщику.

В следующем примере оператор Interval используется для создания уведомлений OnNext один раз в секунду:

private void Button_Click(object sender, RoutedEventArgs e)

{

  Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");

  Observable.Interval(TimeSpan.FromSeconds(1))

      .Subscribe(x => Trace.WriteLine(

          $"Interval {x} on thread {Environment.CurrentManagedThreadId}"));

}

На моей машине вывод выглядит примерно так:

UI thread is 9

Interval 0 on thread 10

Interval 1 on thread 10

Interval 2 on thread 11

Interval 3 on thread 11

Interval 4 on thread 10

Interval 5 on thread 11

Interval 6 on thread 11

Так как Interval работает по таймеру (без привязки к конкретному потоку), уведомления будут выдаваться в потоке из пула потоков, а не в UI-потоке. Если вам потребуется обновить UI-элемент, вы можете направить эти уведомления через ObserveOn и передать контекст синхронизации, представляющий UI-поток:

private void Button_Click(object sender, RoutedEventArgs e)

{

  SynchronizationContext uiContext = SynchronizationContext.Current;

  Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");

  Observable.Interval(TimeSpan.FromSeconds(1))

      .ObserveOn(uiContext)

      .Subscribe(x => Trace.WriteLine(

          $"Interval {x} on thread {Environment.CurrentManagedThreadId}"));

}

Также ObserveOn часто используется для выхода из UI-потока в случае необходимости. Допустим, нужно выполнять некоторые вычисления, создающие высокую нагрузку на процессор, при каждом перемещении мыши. По умолчанию все события перемещения мыши инициируются в UI-потоке, и с помощью ObserveOn вы можете переместить эти уведомления в поток из пула потоков, выполнить вычисления, а затем переместить уведомления о результатах обратно в UI-поток:

SynchronizationContext uiContext = SynchronizationContext.Current;

Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");

Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(

        handler => (s, a) => handler(s, a),

        handler => MouseMove += handler,

        handler => MouseMove -= handler)

    .Select(evt => evt.EventArgs.GetPosition(this))

    .ObserveOn(Scheduler.Default)

    .Select(position =>

    {

      // Сложные вычисления

      Thread.Sleep(100);

      var result = position.X + position.Y;

      var thread = Environment.CurrentManagedThreadId;

      Trace.WriteLine($"Calculated result {result} on thread {thread}");

      return result;

    })

    .ObserveOn(uiContext)

    .Subscribe(x => Trace.WriteLine(

        $"Result {x} on thread {Environment.CurrentManagedThreadId}"));

Выполнив этот пример, вы увидите, что вычисления выполняются в потоке из пула потоков, а результаты выводятся в UI-потоке. Вы также заметите, что вычисления и результаты отстают от ввода; они помещаются в очередь, потому что местоположение мыши обновляется чаще, чем каждые 100 мс. В System.Reactive предусмотрено несколько возможных решений задачи; одно из них — регулировка ввода — рассматривается в рецепте 6.4.

Пояснение

В действительности ObserveOn перемещает уведомления планировщику System.Reactive. В этом рецепте рассматриваются планировщик по умолчанию (для пула потоков) и один способ создания UI-планировщика. Оператор ObserveOn чаще всего используется для перемещения в UI-поток и из него, но планировщики также могут пригодиться во многих других сценариях. Более сложный сценарий, в котором планировщики могут принести пользу, связан с имитацией прохождения времени при модульном тестировании (рассматривается в рецепте 7.6).

lemur.tiff

Оператор ObserveOn управляет контекстом для наблюдаемых уведомлений. Не путайте его с оператором SubscribeOn, который управляет контекстом для кода, добавляющего и удаляющего обработчики событий.

Дополнительная информация

В рецепте 6.1 рассматривается создание последовательностей на базе событий и использование SubscribeOn.

В рецепте 6.4 рассматривается регулировка потоков событий.

В рецепте 7.6 рассматривается специальный планировщик, используемый при тестировании кода System.Reactive.

6.3. Группировка данных событий с использованием Window и Buffer

Задача

Имеется последовательность событий, требуется группировать входящие события по мере их поступления. Другой пример: вы хотите реагировать на пары событий или на весь ввод в пределах двухсекундного окна.

Решение

System.Reactive предоставляет пару операторов для группировки входных последовательностей: Buffer и Window. Buffer сохраняет входные события до завершения группы, после чего передает их все сразу как коллекцию событий. Window логически группирует входные события, но передает их по мере поступления. Возвращаемым типом Buffer является IObservable<IList<T>> (поток событий коллекций), а возвращаемым типом WindowIObservable<IObservable<T>> (поток событий потоков событий).

В следующем примере оператор Interval используется для создания ежесекундных уведомлений OnNext, после чего буферизует их по два:

Observable.Interval(TimeSpan.FromSeconds(1))

    .Buffer(2)

    .Subscribe(x => Trace.WriteLine(

        $"{DateTime.Now.Second}: Got {x[0]} and {x[1]}"));

На моей машине этот код генерирует парный вывод каждые две секунды:

13: Got 0 and 1

15: Got 2 and 3

17: Got 4 and 5

19: Got 6 and 7

21: Got 8 and 9

В следующем примере Window используется для создания групп из двух событий:

Observable.Interval(TimeSpan.FromSeconds(1))

    .Window(2)

    .Subscribe(group =>

    {

      Trace.WriteLine($"{DateTime.Now.Second}: Starting new group");

      group.Subscribe(

          x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x}"),

          () => Trace.WriteLine($"{DateTime.Now.Second}: Ending group"));

    });

На моем компьютере этот пример с Window выдает следующий результат:

17: Starting new group

18: Saw 0

19: Saw 1

19: Ending group

19: Starting new group

20: Saw 2

21: Saw 3

21: Ending group

21: Starting new group

22: Saw 4

23: Saw 5

23: Ending group

23: Starting new group

Эти примеры показывают различия между Buffer и Window. Buffer ожидает всех событий в своей группе, а затем публикует одну коллекцию. Window группирует события аналогичным образом, но публикует события по мере поступления; Window немедленно публикует наблюдаемый объект, который публикует события для этого окна.

Как Buffer, так и Window работают с временными интервалами. В следующем примере все события перемещения мыши собираются в окнах продолжительностью в 1 секунду:

private void Button_Click(object sender, RoutedEventArgs e)

{

  Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(

          handler => (s, a) => handler(s, a),

          handler => MouseMove += handler,

          handler => MouseMove -= handler)

      .Buffer(TimeSpan.FromSeconds(1))

      .Subscribe(x => Trace.WriteLine(

          $"{DateTime.Now.Second}: Saw {x.Count} items."));

}

В зависимости от того, как перемещается мышь, на экране должен появиться вывод следующего вида:

49: Saw 93 items.

50: Saw 98 items.

51: Saw 39 items.

52: Saw 0 items.

53: Saw 4 items.

54: Saw 0 items.

55: Saw 58 items.

Пояснение

Buffer и Window входят в число инструментов для подготовки ввода и придания ему нужной формы. Другой полезный прием — регулировка — рассматривается в рецепте 6.4.

У Buffer и Window существуют другие перегруженные версии, которые могут использоваться в более сложных сценариях. Перегруженные версии с параметрами skip и timeShift позволяют создавать группы, перекрываю­щиеся с другими группами, или пропускать элементы между группами. Также есть перегруженные версии, получающие делегатов, что позволяет динамически определять границы групп.

Дополнительная информация

В рецепте 6.1 рассматривается создание последовательностей из событий.

В рецепте 6.4 рассматривается регулировка потоков событий.

6.4. Контроль потоков событий посредством регулировки и выборки

Задача

Типичная проблема при написании реактивного кода — слишком быстрое поступление событий. Слишком быстрый поток событий может превысить возможности вашей программы.

Решение

System.Reactive предоставляет операторы, предназначенные специально для предотвращения «затопления» данными событий. Операторы Throttle и Sample предоставляют два разных способа контроля над быстро поступающими событиями ввода.

Оператор Throttle устанавливает скользящее окно тайм-аута. При поступлении входного события окно тайм-аута сбрасывается. По истечении окна тайм-аута публикуется значение последнего события, поступившего в границах окна.

Следующий пример отслеживает движения мыши и использует Throttle, чтобы сообщения об обновлениях выдавались только в том случае, если мышь оставалась неподвижной в течение секунды:

private void Button_Click(object sender, RoutedEventArgs e)

{

  Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(

          handler => (s, a) => handler(s, a),

          handler => MouseMove += handler,

          handler => MouseMove -= handler)

      .Select(x => x.EventArgs.GetPosition(this))

      .Throttle(TimeSpan.FromSeconds(1))

      .Subscribe(x => Trace.WriteLine(

          $"{DateTime.Now.Second}: Saw {x.X + x.Y}"));

}

Вывод серьезно изменяется в зависимости от перемещений мыши, но один из результатов запуска на моем компьютере выглядел так:

47: Saw 139

49: Saw 137

51: Saw 424

56: Saw 226

Например, Throttle часто используется при автозаполнении: пользователь вводит текст в текстовом поле, и поиск должен начаться только после того, как пользователь завершит ввод.

Sample использует другой подход к контролю быстрых последовательностей. Sample устанавливает регулярный тайм-аут и публикует последнее значение в этом окне при каждом истечении тайм-аута. Если в период выборки не было получено ни одного значения, то за этот период результаты не публикуются.

Следующий пример отслеживает перемещения мыши и осуществляет их выборку с секундными интервалами. В отличие от примера с Throttle, в примере с Sample для просмотра данных не нужно держать мышь неподвижной в течение секунды:

private void Button_Click(object sender, RoutedEventArgs e)

{

  Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(

          handler => (s, a) => handler(s, a),

          handler => MouseMove += handler,

          handler => MouseMove -= handler)

      .Select(x => x.EventArgs.GetPosition(this))

      .Sample(TimeSpan.FromSeconds(1))

      .Subscribe(x => Trace.WriteLine(

          $"{DateTime.Now.Second}: Saw {x.X + x.Y}"));

}

Вот как выглядел результат на моей машине, когда я сначала оставил мышь неподвижной на несколько секунд, а затем непрерывно двигал ее:

12: Saw 311

17: Saw 254

18: Saw 269

19: Saw 342

20: Saw 224

21: Saw 277

Пояснение

Регулировка и выборка — основные инструменты контроля чрезмерного потока ввода. Не забывайте, что вы можете легко выполнить фильтрацию стандартным оператором LINQ Where. Можно считать, что операторы Throttle и Sample похожи на Where, только они выполняют фильтрацию по временным окнам, а не по данным событий. Все три оператора помогают взять под контроль слишком быстрые входные потоки, но делают это разными способами.

Дополнительная информация

В рецепте 6.1 рассматривается создание последовательностей из событий.

В рецепте 6.2 рассматривается изменение контекста для выдачи событий.

6.5. Тайм-ауты

Задача

Ожидается, что событие поступит в течение определенного времени. Требуется обеспечить своевременную реакцию программы даже в том случае, если событие не поступит. Чаще всего подобные ожидаемые события являются одиночными асинхронными операциями (например, ожиданием ответа на запрос веб-службы).

Решение

Оператор Timeout устанавливает скользящее окно тайм-аута в своем входном потоке. При каждом поступлении нового события окно тайм-аута сбрасывается. Если тайм-аут истекает без получения события в окне, оператор Timeout завершает поток с уведомлением OnError, содержащим исключение TimeoutException.

Следующий пример выдает веб-запрос к условному домену и устанавливает тайм-аут продолжительностью в 1 секунду. Чтобы запустить веб-запрос, в коде используется ToObservable для преобразования Task<T> в IObservable<T> (см. рецепт 8.6):

void GetWithTimeout(HttpClient client)

{

  client.GetStringAsync("()

      .Timeout(TimeSpan.FromSeconds(1))

      .Subscribe(

          x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.Length}"),

          ex => Trace.WriteLine(ex));

}

Timeout идеально подходит для асинхронных операций (таких, как веб-запросы), но может применяться к любому потоку событий. В следующем примере Timeout применяется к перемещениям мыши, с которыми проще экспериментировать:

private void Button_Click(object sender, RoutedEventArgs e)

{

  Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(

          handler => (s, a) => handler(s, a),

          handler => MouseMove += handler,

          handler => MouseMove -= handler)

      .Select(x => x.EventArgs.GetPosition(this))

      .Timeout(TimeSpan.FromSeconds(1))

      .Subscribe(

          x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.X + x.Y}"),

          ex => Trace.WriteLine(ex));

}

На своей машине я переместил мышь, а потом держал ее неподвижной в течение секунды, получив следующие результаты:

16: Saw 180

16: Saw 178

16: Saw 177

16: Saw 176

System.TimeoutException: The operation has timed out.

Учтите, что при отправке OnError исключения TimeoutException поток завершается и перемещения мыши перестают проходить. Возможно, такое поведение окажется нежелательным, поэтому у оператора Timeout существуют перегруженные версии, которые подставляют другой поток при возникновении тайм-аута вместо того, чтобы завершать поток с исключением.

Код следующего примера отслеживает перемещения мыши до возникновения тайм-аута. После тайм-аута код начинает отслеживать щелчки мышью:

private void Button_Click(object sender, RoutedEventArgs e)

{

  IObservable<Point> clicks =

      Observable.FromEventPattern<MouseButtonEventHandler,

        MouseButtonEventArgs>(

          handler => (s, a) => handler(s, a),

          handler => MouseDown += handler,

          handler => MouseDown -= handler)

      .Select(x => x.EventArgs.GetPosition(this));

  Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(

          handler => (s, a) => handler(s, a),

          handler => MouseMove += handler,

          handler => MouseMove -= handler)

      .Select(x => x.EventArgs.GetPosition(this))

      .Timeout(TimeSpan.FromSeconds(1), clicks)

      .Subscribe(

          x => Trace.WriteLine($"{DateTime.Now.Second}: Saw

             {x.X},{x.Y}"),

          ex => Trace.WriteLine(ex));

}

На своей машине я переместил мышь, потом держал ее неподвижной в течение секунды, а затем щелкнул в паре разных точек. Из следующих результатов видно, что до наступления тайм-аута мышь быстро перемещалась, после чего последовали два щелчка:

49: Saw 95,39

49: Saw 94,39

49: Saw 94,38

49: Saw 94,37

53: Saw 130,141

55: Saw 469,4

Пояснение

Оператор Timeout играет важную роль в нетривиальных приложениях, потому что в идеале ваша программа всегда должна реагировать на происходящее, что бы ни происходило вокруг. Он особенно полезен при выполнении асинхронных операций, но также может быть применен к любому потоку событий. Обратите внимание: используемая операция не отменяется; в случае тайм-аута она продолжит выполнение до тех пор, пока не завершится успехом или неудачей.

Дополнительная информация

В рецепте 6.1 рассматривается создание последовательностей на базе событий.

В рецепте 8.6 рассматривается упаковка асинхронного кода как наблюдаемого потока событий.

В рецепте 10.6 рассматривается отмена подписки на последовательности в результате CancellationToken.

В рецепте 10.3 рассматривается использование CancellationToken для тайм-аута.

Назад: Глава 5. Основы Dataflow
Дальше: Глава 7. Тестирование