четверг, 25 ноября 2010 г.

“Реактивные расширения” и асинхронные операции

  • Прежде чем переходить непосредственно к работе с асинхронными операциями с помощью библиотеки Reactive Extensions (или Rx), важно понять причины возникновения этой библиотеки, и какие основные принципы положили разработчики в ее основу.

    Одним из наиболее типичных отношений между двумя классами является отношение использования (“uses a” relationship), когда один класс используют функциональность других классов для решения своих задач. Мы с подобным отношением сталкиваемся ежедневно, даже не задумываясь о нем: наши классы используют строки, целые числа, классы работы с консолью, сетью, файлами и другими ресурсами. Говорят, что объект класса A взаимодействует (interact) с объектом класса B и получает (вытягивает, pulls) у него необходимые данные; такая модель взаимодействия называется pull-моделью (или интерактивной моделью) (рисунок 1а). С другой стороны, часто возникает ситуация, когда объект класса A не знает, когда будут доступны необходимые ему данные в классе B, и в таком случае гораздо удобнее, чтобы объект класса B «сказал» об этом самостоятельно и «вытолкнул» (push) некоторые данные, когда они станут доступны. В этом случае говорят, что объект класса А реагируют (react) на возникшие событие и соответствующая модель называется push-моделью или реактивной моделью (рисунок 1б) [Meijer RX1].

     Rx1
    Рисунок 1а. Класс A взаимодействует с классом B

     Rx2
    Рисунок 1б. Класс A реагирует на события от класса B

    ПРИМЕЧАНИЕ
    У реактивной (или push-модели) существует и третье название: «Принцип Голливуда» (Hollywood Principle) – «Не звоните нам, мы сами вам позвоним» (Don't call us, we'll call you) [Syme 2010].

    Даже если вы никогда в жизни не слышали ни одно из трех названий реактивной модели программирования, работая с платформой .Net вы сталкиваетесь с ней постоянно. Типичными представителями этой модели являются события (events) и модель асинхронного программирования (Asynchronous Programming Model, APM), когда окончание выполнения метода происходит асинхронно, после завершения некоторой операции, а также в других проявлениях паттерна «Наблюдатель».

    Начиная с версии 3.0 в языке C# появился LINQ – замечательная возможность, которая здорово упростила нашу жизнь при решении самых разных задач. В центре этой библиотеки находится pull-модель и интерфейс IEnumerable<T>, однако до сих пор ничего подобного не было предложено для работы с push-моделью. Именно эту нишу заняла библиотека Rx, которая призвана не просто упростить работу с событиями и асинхронными операциями, она предоставляет унифицированный доступ к ним и позволяет повторно использовать весь существующий опыт и знания, которые вы накопили, работая с LINQ. Я думаю, что теперь слово «реактивный», который внимательный читатель мог заметить в названии темы и, собственно, библиотеки (напомню, она называется Reactive Extensions), станет более осмысленным – эта библиотека призвана упростить код, реагирующий на события, происходящие в других частях вашей системы или во внешнем мире.

    Двойственность интерфейсов

    Давайте рассмотрим типичный процесс использования паттерна «Итератор» в языке C#. Итак, нам нужен «итерируемый» объект, реализующий интерфейс IEnumerable<T>. Этот интерфейс содержит всего лишь один метод: GetEnumerator, возвращающий непосредственно итератор (объект, реализующий интерфейс IEnumerator<T>). Сам же процесс итерирования тоже выглядит весьма просто: для получения текущего элемента, на который указывает итератор,  достаточно обратиться к свойству Current, а для перехода к следующему элементу – вызвать метод MoveNext. Итерирование элементов завершается, когда метод MoveNext возвращает false. Хотя никто нам не мешает использовать итераторы таким образом, обычно мы пользуемся более высокоуровневыми конструкциями, такими как foreach.

    public interface IEnumerable<T>
    {
        IEnumerator<T> GetEnumerator();
    }

    public interface IEnumerator<T> : IDisposable
    {
        T Current { get; }
        bool MoveNext();
        // Метод Reset удален за ненадобностью
    }

    Итераторы являются типичными представителями pull-модели, когда процессом перебора последовательности управляет вызывающий код. Типичным же представителем «реактивного» программирования (или push-модели) является паттерн «Наблюдатель», основная идея которого заключается в предоставлении интерфейса «обратного вызова» с помощью которого наблюдаемый объект уведомляет наблюдателей о произошедших событиях. Библиотека Rx построена на базе двух таких интерфейсов: IObservable<T> (интерфейс наблюдаемого объекта) и IObserver<T> (интерфейс наблюдателя). Эти интерфейсы являются частью .Net Framework 4.0 (однако оставшуюся часть библиотеки вам придется скачать самостоятельно) и выглядят следующим образом:

    public interface IObservable<T>
    {
        IDisposable Subscribe(IObserver<T> observer);
    }
    public interface IObserver<T>
    {
        void OnNext(T value);
        void OnCompleted();
        void OnException(Exception error);
    }

    Использование этих интерфейсов следующее: наблюдаемый класс реализует интерфейс IObservable<T> с единственным методом Subscribe. В этом методе он принимает интерфейс наблюдателя и сохраняет ссылку на него в своем внутреннем списке подписчиков. Это звучит достаточно знакомо: во-первых, это очень похоже на классический паттерн «Наблюдатель», описанный в «банде четырех», да и работа с событиями в .Net выполняется аналогично: мы подписываемся на некоторые события, когда в них возникает необходимость и отписываемся от них, когда такая необходимость завершается. Но если для событий предусмотрены два отдельных метода (методы Add и Remove или операторы += и -=), то в случае с наблюдателями было принято другое решение: что если вместо хранения нужного экземпляра интерфейса наблюдателя и передачи его в метод Unsubscibe, мы вернем disposable объект из метода Subscibe? Это позволяет использовать анонимные делегаты для обработки событий и не требует создание экземпляров анонимного метода для последующей отписки, поскольку вызывающий код отписывается от событий путем вызова метода Dispose, а не путем передачи делегата. Причем если вызывающему коду не нужна функциональность отписки от событий (что справедливо в большинстве случаев), то он может проигнорировать возвращаемое значение и не делать никаких дополнительных действий. В мире .Net отсутствие вызова метода Dispose выглядит как серьезнейший “code smell”, однако в этом случае это совершенно нормально: по сути, вызов метода Dispose играет роль оператора break в блоке foreach.

    С первого взгляда интерфейсы наблюдателей и итераторов имеют мало общего, но если присмотреться внимательнее, то двойственность этих интерфейсов увидеть не столь сложно. Эрик Мейер [Meijer RX1] и Барт де Смет [Bart 2010] показали математическую двойственность этих интерфейсов, но даже не забираясь в такие дебри мы можем увидеть их семантическое сходство (таблица 1).

    Enumerable    Observable    Примечание
    IEnumerable<T>.
    GetEnumerator()
    IObservable<T>.
    Subscribe()
    Получение итератора/подписка на события наблюдаемого объекта.
    IEnumerator<T>.
    Current
    1) IObserver<T>.
    OnNext()
    2) IObserver<T>.
    OnException()

    Получить (и обработать) очередной элемент. Свойство IEnumerator<T>.Current играет две роли: (1) получение текущего элемента; (2) генерация исключения, в случае ошибки. Поэтому для простоты и ясности это свойство разбито на два метода в интерфейсе IObservable<T>.

    IEnumerator<T>.
    MoveNext()
    IObserver<T>.
    OnComplete()

    IEnumerator<T>.MoveNext также выполняет две роли: (1) переместить итератор на следующий элемент последовательности; (2) сообщить пользовательскому коду, что итератор достиг конца последовательности. Поскольку мы не можем (и не должны) явным образом «перебирать» элементы при использовании IObservable<T> (ведь это реактивная модель и наблюдаемый объект  сам нам говорит о том, что у нас получен очередной элемент путем вызова метода IObservable<T>.OnNext), то наблюдателю нужен только один дополнительный метод, который бы сообщал о том, что наблюдаемая последовательность завершена именно эту роль играет метод IObservable<T>.OnComplete.

    IEnumerable<T>.
    Dispose()
    IDisposable IObservable<T>.
    Subscibe()

    Хотя и Enumerable, и Observable имеют какое-то отношение к интерфейсу IDisposable, понятие «очистки ресурсов», которое за ним скрывается, существенно отличается. Так IEnumerable<T>.Dispose предназначен для очистки ресурсов, используемых в блоке итераторов (Iterator Block) и отсутствие вызова этого метода может привести к серьезным неприятностям (например, не будет вызван блок finally внутри блока итераторов). Однако объект, возвращаемый при вызове метода Subscibe, предназначен лишь для «отписки» наблюдателя от наблюдаемого объекта; этот вызов играет роль оператора break при итерировании элементов внутри блока foreach.

    Таблица 1. Сравнение интерфейсов итераторов и наблюдателей

    ПРИМЕЧАНИЕ
    Однако не стоит забывать, что без отписки от событий путем вызова метода Dispose, ссылка на ваш объект остается во внутреннем списке наблюдаемого объекта и может предотвратить уничтожение вашего объекта сборщиком мусора.

    Простой пример использования Rx

    В библиотеке Rx класс Observable играет роль, аналогичную классу Enumerable в LINQ 2 Objects, поэтому большую часть времени вы будете работать именно с ним. Кроме того, многие методы в классе Observable аналогичны методам из класса Enumerable, так что, практически все, что вы могли делать в LINQ 2 Objects доступно и в Rx.

    Давайте начнем с простого примера:

    IObservable<int> range = from i in Observable.Range(1, 10)
                                where i % 2 == 0
                                select i;
    range.Subscribe(i => Console.WriteLine("Next element: {0}", i),
        e => Console.WriteLine("Error: {0}", e.Message),
        () => Console.WriteLine("Range observation complete"));

    В данном случае метод Observable.Range возвращает интерфейс IObservable<int>, к которому мы применяем привычный LINQ синтаксис фильтрации, в результате чего получаем «наблюдаемую» последовательность, содержащую только четные элементы. Однако, аналогично классическому LINQ 2 Objects, сам LINQ-запрос не приводит к исполнению чего-либо; для получения уведомлений от наблюдаемого объекта необходимо подписаться на уведомления путем вызова метода Subscribe. (Напомню, что в классическом LINQ исполнение кода также является «отложенным» (lazy) и не выполняется до перебора (или потребления) последовательности или до вызова определенных методов, таких как Count, ToList и других.) Метод Subscibe класса Observable содержит ряд перегруженных версий, начиная от версии, принимающей IObserver<T> (в данном случае IObserver<int>), заканчивая версиями, которые принимают функции обратного вызова для методов OnNext, OnError и OnComplete. Если использование методов обратного вызова по какой-то причине не подойдет, то вы можете реализовать интерфейс IObservable<T> руками или воспользоваться методом Create класса Observer.

    Результат выполнения этого фрагмента кода следующий:

    Next element: 2
    Next element: 4
    Next element: 6
    Next element: 8
    Next element: 10
    Range observation complete

    ПРИМЕЧАНИЕ
    В сети находится большое количество примеров использования библиотеки Rx; первое, на что стоит обратить внимание – это 101 Rx Sample, помимо этого, Ли Кэмпбелл (Lee Campbell) в своем блоге опубликовал 7 постов, которые хоть и не покрывают весь функционал библиотеки, но тоже являются отличной отправной точкой: Reactive Extensions for .NET an Introduction.

    Обработка событий пользовательского интерфейса

    Получение четных значений в диапазоне от 1 до 10 с их последующей обработкой в лямбда-выражении является отличной возможностью, которая скрасит ваши серые программистские будни, однако этого явно недостаточно, чтобы влюбиться в эту библиотеку с первого взгляда. Поэтому давайте рассмотрим более реальный пример обработки событий от пользовательского интерфейса с помощью библиотеки Rx.

    События в .Net по своей природе являются частным случаем паттерна «Наблюдатель» и push-модели программирования, поэтому с помощью Rx мы можем преобразовать события в push-коллекции и рассматривать их, как поток данных, к которому можно применять привычные операции фильтрации и преобразования, привычные по использованию в LINQ.

    Давайте предположим, что нам нужно отслеживать события двойного клика мыши в заданном регионе экрана, причем нам необходимо контролировать интервал между нажатиями кнопки мыши, чтобы считать их двойным кликом. (Эта задача может показаться высосанной из пальца, однако у нее есть и реальное практическое применение: в Silverlight такого понятия как двойной клик нет и его нужно как-то получать самостоятельно). Если бы мы могли рассматривать стандартные события от нажатия кнопки мыши как некоторую последовательность точек и интервалов времени между ними, то мы могли бы воспользоваться LINQ выражением для решения нашей задачи:

    var mouseDoubleClickEvents = from e in this.MouseDown
                                    where rectangle.Contains(e.X, e.Y) &&
                                        e.Interval.TotalMilliseconds < 500
                                    select new { e.X, e.Y, e.Interval };

    где rectangle – это некоторый объект типа Rectangle, задающий необходимый регион экрана, а свойство Interval типа Timespan содержит время, прошедшее с момента возникновения предыдущего события.

    Очевидно, что этот код не будет компилироваться, поскольку мы не можем применять LINQ-запросы к простым событиям без дополнительных танцев с бубном. Однако мы можем добиться этого с помощью Rx:

    var eventsAsObservable = (from move in Observable.FromEvent<MouseEventArgs>(this, "MouseDown")
                               select new { move.EventArgs.X, move.EventArgs.Y }).TimeInterval()
                               .Where(e => rectangle.Contains(e.Value.X, e.Value.Y) &&
                                            e.Interval.TotalMilliseconds < 500);
    eventsAsObservable.Subscribe(e =>
        Console.WriteLine("Double click: X={0}, Y={1}, Interval={2}",
        e.Value.X, e.Value.Y, e.Interval));

    Давайте разберем этот код по строкам. Метод Observable.FromEvent<MouseEventArgs>(this, “MouseDown”), возвращает IObservable<MouseEventArgs> - push-коллекцию событий мыши, которые будут «выталкиваться» при нажатии кнопки мыши пользователем. Оператор select new {move.EventArgs.X, move.EventArgs.Y} возвращает IObserver анонимного типа, который содержит пару свойств: X и Y. Метод расширения TimeInterval класса Observable добавляет поле типа TimeSpan для каждого элемента observable-коллекции и содержит время между предыдущим и текущим элементом (в данном случае – это время между кликами мыши). Далее идет достаточно привычный метод расширения Where, который делает именно то, что от него ожидается: фильтрует элементы push-коллекции в случае невыполнения указанного предиката (т.е. отбрасывает элементы, для которых лямбда-выражение вернет false).

    Как уже говорилось выше, класс Observable содержит несколько методов расширения, которые принимают функции обратного вызова для соответствующих методов интерфейса IObserver<T>; в данном случае нас интересует только метод OnNext, поэтому остальные делегаты мы просто не указываем. Это устраняет необходимость в реализации интерфейса IObserver<T> руками и делает код более читабельным.

    В результате, если этот код скопировать в конструктор формы после метода InitializeComponent или в обработчик события FormLoad, то мы будем получать вывод на консоль, при двойном клике мышки в определенной области экрана.

    Работа с асинхронными операциями

    Как уже было сказано в предыдущем посте, модель асинхронного программирования (Asyncronous Programming Model, APM) на платформе .Net представлена парой методов: BeginXXX, EndXXX. Метод BeginXXX лишь инициирует асинхронную операцию и возвращает управление сразу же, при этом метод EndXXX обычно вызывается уже после завершения асинхронной операции и возвращает результаты ее выполнения. Поскольку окончание асинхронной операции происходит асинхронно, то по сути APM является частным случаем push-модели программирования.

    Библиотека Rx содержит ряд вспомогательных методов, которые упрощают использование асинхронной модели программирования. Давайте рассмотрим следующий пример, в котором создается делегат типа Func<int, int, int>, принимающий два целочисленных параметра и возвращающий их сумму. Затем мы вызываем этот делегат синхронно, асинхронно и с помощью расширений библиотеки Rx:

    // Объявляем делегат, который принимает два параметра
    // целочисленного типа и возвращает их сумму
    Func<int, int, int> add = (_x, _y) => _x + _y;
    int x = 1, y = 2;
    // Вызываем делегат синхронно
    int syncResult = add(1, 2);
    Console.WriteLine(@"Synchronous call function add({0}, {1}): result = {2},
                CurrentThreadId = {3}", x, y, syncResult,
                                      Thread.CurrentThread.ManagedThreadId);
    // Вызываем делегат с помощью APM
    add.BeginInvoke(x, y, ar =>
        {
            var asyncResult1 = add.EndInvoke(ar);
            Console.WriteLine(@"Asynchronous call function add({0}, {1}): result = {2},
                 CurrentThreadId = {3}", x, y, asyncResult1,
                                       Thread.CurrentThread.ManagedThreadId);
        }, null);
               
    // Мы можем рассматривать синхронную версию делегата типа
    // Func<T1, T2, T3> следующим образом: Func<T1, T2, IObservable<T3>>.
    // Таким образом мы преобразуем тип T3 возвращаемого значения синхронного
    // делегата в IObservable<T3> (т.е. результат будет "вытолкнут" после завершения
    // асинхронной операции)
    Func<int, int, IObservable<int>> obvervableAdd = add.ToAsync();
    IObservable<int> result = from added in obvervableAdd(x, y)
                                select added;
    result.Subscribe(r => Console.WriteLine(@"Observable result for function add({0}, {1}):
                          result = {2}, CurrentThreadId = {3}", x, y, r,
                                        Thread.CurrentThread.ManagedThreadId));

    Результат выполнения этого кода следующий:

    Synchronous call function add(1, 2): result = 3, CurrentThreadId = 10
    Asynchronous call function add(1, 2): result = 3, CurrentThreadId = 7
    Observable result for function  add(1, 2): result = 3, CurrentThreadId = 7

    Как мы видим, для работы с асинхронным делегатом в LINQ-стиле достаточно воспользоваться методом расширения Observable.ToAsync, который преобразовал тип возвращаемого значения делегата из типа int к типу IObservable<int>.

    Теперь давайте вернемся к более сложному примеру, рассмотренному в предыдущей статье: нам необходимо одновременно обратиться к трем веб-страницам (при этом сделать это с помощью APM), получить некоторый результат, который затем необходимо сохранить в файл, опять-таки, асинхронно. Чтобы решить эту задачу с помощью Rx нам понадобиться написать несколько методов расширения, которые преобразуют асинхронные операции над потоком (экземпляром класса Stream) и веб-запросом (экземпляром класса WebRequest) к observable-коллекциям. Для этого нам понадобятся два метода расширения:

    // Метод расширения, преобразующий асинхронное получение WebResponse в
    // IObservable<WebResponse> (в push-коллекцию)
    public static IObservable<WebResponse> GetResponseAsync(this WebRequest webRequest)
    {
        return Observable.FromAsyncPattern<WebResponse>(webRequest.BeginGetResponse,
            webRequest.EndGetResponse)();
    }
     
    // Метод расширения, преобразующий асинхронную запись строки в поток
    // к IObservable<Unit>.
    public static IObservable<Unit> WriteAsync(this Stream stream, string value)
    {
        return Observable.FromAsyncPattern<byte[], int, int>(stream.BeginWrite, stream.EndWrite)
            (UnicodeEncoding.Default.GetBytes(value), 0,
            UnicodeEncoding.Default.GetByteCount(value));
    }

    Методы достаточно простые, они преобразуют асинхронный вызов соответствующего метода в observable-коллекцию, тип которой соответствует результату выполнения асинхронной операции. С методом WebRequest.EndGetResponse все просто, он возвращает объект класса WebResponse, а вот с методом Stream.EndWrite все несколько сложнее, ведь у него нет возвращаемого значения. Поскольку мы не можем реализовать интерфейс IObservable<void>, разработчики библиотеки Rx добавили тип Unit, который как раз и играет роль отсутствия возвращаемого значения.

    На основе приведенных методов расширения достаточно просто решить нашу задачу (кода не мало, но он не такой уж сложный).

     

    var urls = new string[] { "http://rsdn.ru", "http://gotdotnet.ru", "http://blogs.msdn.com" };

    // Получаем push-коллекцию анонимных классов, которые представляют
    // собой пару Url и соответствующий WebResponse
    var observableWebResponses = from url in Observable.ToObservable(urls)
                let request = WebRequest.Create(url)
                from response in request.GetResponseAsync()
                select new { Url = url, response.ContentLength };

    var aggregatedResults = observableWebResponses
        // получаем "интервал" между каждым новым "событием"
        .TimeInterval()
        // агрегируем все результаты и получаем единую строку
        .Aggregate(
            // создаем начальное значение (seed) агрегации
            new StringBuilder(),
            (sb, r) =>
                {
                    // добавляем полученное значение в агрегат
                    sb.AppendFormat("{0}: {1}, interval {2}ms",
                                    r.Value.Url, r.Value.ContentLength,
                                    r.Interval).AppendLine();
                    // возвращаем измененный агрегат
                    return sb;
                });

    try
    {
        // Вызов метода First приведет к ожиданию завершения всех асинхронных
        // операций и получению объединенного результата.
        // Кроме того, если произойдет исключение при выполнении одной из
        // асинхронных операций, то мы его перехватим здесь, а не "упадем"
        // с необработанным исключением, возникшем в рабочем потоке.
        // Если дожидаться завершения операции не нужно, то все дополнительные
        // действия (такие, как запись результата в файл) можно выполнить
        // в функции Subscibe
        var requestsResult = aggregatedResults.First().ToString();
        Console.WriteLine("Aggregated web request results:{0}{1}", Environment.NewLine, requestsResult);
                   
        // Сохраняем результат выполнения всех операций используя
        // "безопасную", с точки зрения управления ресурсов, функцию
        // Observable.Using, которая закроет файл автоматически при завершении
        // асинхронной операции
        Observable.Using(() => new FileStream("d:\\results.txt", FileMode.Create,
                        FileAccess.Write, FileShare.Write),
                fs => fs.WriteAsync(requestsResult)).First();
    }
    catch(WebException e)
    {
        Console.WriteLine("Error requesting web page: {0}", e.Message);
    }
    catch(IOException e)
    {
        Console.WriteLine("Error writing results to file: {0}", e.Message);
    }

    С первого взгляда этот код может выглядеть несколько пугающе, но он нормально структурирован и комментарии играют свое доброе дело. Работа с асинхронными операциями с помощью Rx упрощает дело, но все же не делает ее столь же простой, как и работу с синхронными операциями. В C# 5.0 появится поддержка асинхронности на уровне языка, что позволит изменить код с синхронного на асинхронный путем добавления нескольких ключевых слов. В этом случае код будет действительно простым, но подробнее об этом уже в следующий раз.

    Вместо заключения

    Библиотека Rx это не такой уж маленький и простенький зверек, которого можно одолеть на одном десятке страниц, так что я даже не пытался рассказать обо всех возможностях (для этого нужна не статья, а небольшая книга). У этой библиотеки действительно богатая функциональность, которая может, как упростить решение (как в примере с событиями и UI), так и сделать его сложным для чтения, сопровождения и отладке (нагромождение вложенных лямбд вряд ли будет способствовать радостным эмоциям у ваших коллег при работе с вашим кодом). Так что, как и с любым другим инструментом, здравый смысл должен уберечь вас от впихивания новой штуки, о которой вы только что узнали, куда ни попадя, а пользоваться им с умом тогда, когда это действительно необходимо.

    В следующий раз:  C# 5.0 и поддержка асинхронного программирования на уровне языка.

    Дополнительные ссылки

    1. [Syme 2010] Don Syme, Tomas Petricek, Dmitry Lomov. The F# Asynchronous Programming Model
    2. Matthew Podwysocki. Reactive Extentions

    13 комментариев:

    1. Классно выглядит.

      Ты в своих проектах это используешь?

      Видимо такие коллекции больше применимы в Silverlight? Или есть другие удачные способы использования?

      ОтветитьУдалить
    2. У нас в вообще эта штука в warehouse-е используется (точнее в процессе ETL-я - переноса данных в хранилище). Но там она используется скорее для упрощения работы с многопоточностью нежели по прямому назначению.
      Ну и в паре мест в WinForms прикручено. Так что пока обширным продакшн использованием похвастаться не могу.

      ОтветитьУдалить
    3. Если не секрет, зачем
      IObservable result = from added in obvervableAdd(x, y) select added; такое присваивание. Почему нельзя присвоить obvervableAdd(x, y)?

      ОтветитьУдалить
    4. Отличная статья. Спасибо, понравилось.
      Поправьте только -
      "Итераторы являются типичными представителями pull-модели, ко'р'да процессом перебора"

      ОтветитьУдалить
    5. var observableWebResponses = from url in Observable.ToObservable(urls)
      let request = WebRequest.Create(url)
      from response in request.GetResponseAsync()
      select new { Url = url, response.ContentLength };

      Можете прокомментировать, зачем нужно urls оборачивать в Observable? Почему не создать сразу коллекцию WebRequest-ов, а уже из неё сделать SelectMany request.GetResponseAsync() ?

      ОтветитьУдалить
    6. 2Roman:
      Rонечно можно использовать просто observableAdd(x, y). Просто этот пример показывает использование выражения запросов (query comprehensions) совместно с RX.

      2eugene:
      Cпасибо за спасибо:) Очепятку исправил.

      2Андрей:
      Можно сразу не оборачивать url-ы в Observable коллекцию, но тогда код не такой красивый получится, поскольку не удастся весь его впихнуть в query comprehension. Так что в принципе, разницы нет никакой. Просто дело вкуса, какой синтаксис больше нравится.

      ОтветитьУдалить
    7. Сергей, мне право не удобно, не хочу выглядеть педантом, но в табличке тоже не все хорошо :(. OnNext, OnCompleted описаны в IObserver, а не в IObservable. Я не возражаю, если коммент почистите. Просто не все в теме, а табличка может немного запутать....

      ОтветитьУдалить
    8. 2eugene: спасибо огромное! Это очепятка. А удалять ваш комментарий совсем не стоит!

      :bear:

      ОтветитьУдалить
    9. Спасибо за статью, Сергей!

      Я вот только не согласен со следующим утверждением: "Однако объект, возвращаемый при вызове метода Subscibe, предназначен лишь для «отписки» наблюдателя от наблюдаемого объекта; этот вызов играет роль оператора break при итерировании элементов внутри блока foreach."

      На самом деле IDisposable играет ту же роль - освобождение ресурсов. Можно рассматривать факт подписки на IObservable как некий ресурс, который следует гарантировано освобождать - это раз. Никто не запрещает реализации IObservable вместе с вызовом Subscribe ещё и создавать какие-нибудь ресурсы, требующие детерминированного высвобождения, точно так же как в итераторах - это два (например, ЛюбойИтераторСРесурсами().AsObservable()). Я думаю не стоит читателю преподавать отписку через IDisposable как некоторую особенность IObservable, лучше просто наоборот упомянуть дуальность в плане управления ресурсами через IDisposable и просто упомянуть что подписка сама по себе является ресурсом.

      ОтветитьУдалить
    10. 2Александр (a.k.a. Пельмешко)
      Хм... А насколько логично очищать какие-то ресурсы при отписке одного из подписчиков? По идее нужно это делать, только когда этих подписчиков вообще не останется.

      Но с другой стороны, откуда наблюдаемому объекту знать, что к нему через секунду не подпишется снова? В таком случае он преждевременно завершит пушить свои элементы наблюдателям, хотя в них кто-то еще может нуждаться (пусть и не сейчас).
      Кроме того, ведь наблюдатель сам знает, когда последовательность иссякнет и самостоятельно вызовет finally блок (если говорить о блоке finally внутри итератора).

      Я эту штуку не проверял, а скорее у кого-то увидел, то ли у Скита, то ли у Podwysock-а, точно не помню.

      Саш, а ты можешь привести пример, когда это важно?
      А то я что-то сходу не смог сварганить пример, который бы четко показывал необходимость в этом...

      З.Ы. С точки зрения управления памятью я сноску сделал, поскольку если наблюдаемый объект будет жить вечно и твоя ссылка будет у него висеть в списке наблюдателей, то твой объект тоже сборщик не съест. А вот с точки зрения управления ресурсами пока не въехал, где может быть проблема :(

      ОтветитьУдалить
    11. 2Сергей:
      Ресурсы не обязательно должны разделяться между всеми подписчиками, они вполне могут выделяться на каждого подписчика индивидуально.

      Если будешь продолжать писать про Rx, то думаю стоит посвятить пост различию между "холодными" и "горячими" реализациями IObservable, в этом действительно много отличий от IEnumerable. Практически все реализации IEnumerable не имеют side-effects в реализации GetEnumerator(), а вот в случае Rx, реализации Subscribe() достаточно часто запускают какой-нибудь асинхронный процесс (холодные IObservable, например Observable.Range, Observable.Interval), но не всегда (горячие IObservable, например из обычных CLI-событий).

      Вот именно у холодных IObservable при каждом факте подписки могут выделяться ресурсы, которые должен подчищать либо сам IObservable по завершению асинхронной операции и отправке OnCompleted / OnError, либо пользователь снаружи по Dispose().

      ОтветитьУдалить
    12. А можно ли пример с обращениями к веб-страницам переписать таким образом, чтобы в случае возникновения ошибки при асинхронном запросе по одному из url-ов, процесс выполнения запросов не прерывался, а, скажем, текст сообщения об ошибке агрегировался с остальными ответами?

      ОтветитьУдалить
    13. Последний пример зависает, на обработке третьего url, причём не важно в какой последовательности и что за url.
      Очень странно, проверял на двух машинах.
      .net 4.0 RX 1.0.10621

      ОтветитьУдалить