воскресенье, 24 октября 2010 г.

Асинхронные операции и AsyncEnumerator

  • Все знают, что синхронное выполнение длительных операций – это плохо. Это плохо по многим причинам, начиная с того, что синхронные решения приводят к подвисанию пользовательского интерфейса, плохой масштабируемости и расширяемости, заканчивая тем, что вы не сможете воспользоваться всеми возможностями многоядерного процессора даже вашего домашнего компьютера, не говоря уже за преимущества асинхронного ввода/вывода, эффективность которого проявляется даже на одноядерных машинах. О теме асинхронности и многопоточности говорят на каждом шагу, начиная от небезызвестных товарищей вроде Джеффри Рихтера, Джо Даффи или Джозефа Албахари, заканчивая множеством статей на каждом втором техническом сайте и обязательным вопросом на собеседовании типа «А сколько вы знаете способов выполнить операцию асинхронно?».

    Авторы книг и статей, любят эту тему, потому что она обширная и сложная, да в последнее время еще и модная. Интервьюеры же любят ее за то, что вариантов ответа на этот вопрос в контексте платформы .Net много, и у каждого интервьюера может быть свой любимый способ доказать соискателю, что тот чего-то все же не знает. С точки зрения простого программиста использование всех современных инструментов для работы с многопоточностью, несмотря на все обещания их разработчиков, все еще остается делом непростым и легко может привести к вывиху мозга, а малейшая неосторожность – к отстрелу нескольких ценных конечностей. Одной из причин, почему так происходит, является то, что вся поддержка многопоточности в языке C# заканчивается оператором lock, а все остальное прикручено к нему благодаря сторонним (или не очень) библиотекам, начиная с BCL и RX, заканчивая PowerThreading и другими велосипедами, различного вида и формы (хотя, конечно же, главная причина заключается в том, что тема, эта сама по себе, не просто сложная, а очень сложная, и все попытки ее упростить настолько, чтобы она была понятна домохозяйкам, успехом не увенчалась, и едва ли увенчается когда-либо).

    Я же никакие свои велосипеды изобретать не собираюсь, а сосредоточусь лишь на различных способах работы с асинхронной моделью программирования (AsynchronousProgrammingModel, APM) на платформе .Net на примере идеи, предложенной Джеффри Рихтером с его классом AsyncEnumerator.

    Синхронное и асинхронное выполнение операций

    Существует несколько различных точек зрения на то, как лучше всего хорошенько разобраться в чем-то новом. Классическая теория считает, что для того, чтобы лучше изучить лягушку, ее следует разрезать и тщательно исследовать ее внутренности, ну а другая точка зрения заключается в том, что лучше ее не резать, а изобрести заново. Если вам больше импонирует первая точка зрения, то никто вам не мешает скачать исходники PowerThreading прямо сейчас и приступить к изучению его внутренностей, ну а если вам нравится второй подход (ну или, по крайней мере, вы готовы попробовать подойти к изучению с другой стороны), то давайте «изобретем» AsyncEnumerator прямо сейчас.

    Для начала нужно ответить на старый как советский пылесос «Пионер» вопрос: зачем нам вообще нужна вся эта асинхронность, чем нам не хватает старых добрых синхронных операций, ну или максимум асинхронного выполнения операции с помощью того же пула потоков.

    Очень популярным примером выполнения асинхронных операций является обращение к какому-нибудь внешнему ресурсу типа веб-сервиса, доступ к веб-странице или асинхронный доступ к файлу. И хотя в качестве примера с тем же успехом можно использовать метод с телом вида Sleep(5000), сделаем вид, что мы стараемся решить настоящую задачу, например, сварганить свой собственный веб-браузер с поддержкой нескольких вкладок.

    Итак, давайте рассмотрим синхронную версию функции, которая обращается к нескольким веб-страницам и выводит на экран длины полученных ответов (не слишком функционально для настоящего веб-браузера, но нужно же с чего-то начинать):

    static void SyncVersion()
    {
        Stopwatch sw = Stopwatch.StartNew();
        string url1 = "http://rsdn.ru";
        string url2 = "http://gotdotnet.ru";
        string url3 = "http://blogs.msdn.com";
        var webRequest1 = WebRequest.Create(url1);
        var webResponse1 = webRequest1.GetResponse();
        Console.WriteLine("{0} : {1}, elapsed {2}ms", url1,
            webResponse1.ContentLength, sw.ElapsedMilliseconds);

        var webRequest2 = WebRequest.Create(url2);
        var webResponse2 = webRequest2.GetResponse();
        Console.WriteLine("{0} : {1}, elapsed {2}ms", url2,
            webResponse2.ContentLength, sw.ElapsedMilliseconds);

        var webRequest3 = WebRequest.Create(url3);
        var webResponse3 = webRequest3.GetResponse();
        Console.WriteLine("{0} : {1}, elapsed {2}ms", url3,
            webResponse3.ContentLength, sw.ElapsedMilliseconds);
    }

    У синхронного выполнения операций, подобных чтению веб-страниц имеется ряд недостатков: во-первых, они блокируют текущий поток на неопределенный срок, а ведь этот поток вполне может оказаться потоком пользовательского интерфейса, и наше приложение будет выглядеть замечательным беленьким окошком с песочными часиками посередине. Кроме того, если каждый запрос будет длиться 5 секунд, то три запроса будут выполняться 15 секунд, в то время, как при параллельном выполнении все три запроса могли бы выполняться немногим более 5 секунд. Конечно, этого никогда не будет, особенно при обращении к веб-страницам, поскольку часть из этого времени тратится на передачу данных по разделяемому каналу, что приведет к увеличению времени выполнения каждого параллельного запроса, но у запроса есть и постоянная составляющая, которая не будет увеличиваться при одновременном выполнении нескольких запросов. И хотя о конкретных цифрах увеличения эффективности параллельного выполнения подобных операций можно спорить очень долго, можно с уверенностью сказать, что асинхронное выполнение операций, интенсивно использующих ввод/вывод (IO Bound), дело выгодное, как с точки зрения эффективности, так и масштабируемости.

    Поскольку идея асинхронного выполнения далеко не нова, классы, подобные WebRequest (и его наследник HttpWebRequest) поддерживают специальный набор операций, который позволяет выполнять длительные операции асинхронно с помощью методов BeginGetResponse и EndGetResponse. Традиционно, такая модель называется APM – Asynchronous Programming Model.

    static void SimpleApm()
    {
        string url1 = "http://rsdn.ru";
        string url2 = "http://gotdotnet.ru";
        string url3 = "http://blogs.msdn.com";
        var webRequest1 = WebRequest.Create(url1);
        webRequest1.BeginGetResponse(ProcessWebRequest, webRequest1);

        var webRequest2 = WebRequest.Create(url2);
        webRequest2.BeginGetResponse(ProcessWebRequest, webRequest2);

        var webRequest3 = WebRequest.Create(url3);
        webRequest3.BeginGetResponse(ProcessWebRequest, webRequest3);
    }


    static void ProcessWebRequest(IAsyncResult ar)
    {
        var webRequest = (WebRequest)ar.AsyncState;
        var webResponse = webRequest.EndGetResponse(ar);
        Console.WriteLine("{0}: {1}", webRequest.RequestUri, webResponse.ContentLength);
    }

    Теперь основной поток выполняет только запросы на исполнение асинхронных операций, а сами операции выполняются в рабочих потоках пула потоков и большую часть времени спят, ожидая завершения операций ввода/вывода. Это позволяет поставить практически одновременно все три запроса ввода/вывода и независимо ожидать завершения каждого из них. В данном коде дополнительный метод ProcessWebRequest не дает нам ничего, кроме захламления кода, усложнения его понимания и подержи. Некоторые из этих проблем легко решить с помощью анонимных методов:

    static void LambdaBasedApm()
    {
        var sw = Stopwatch.StartNew();
        string url1 = "http://rsdn.ru";
        string url2 = "http://gotdotnet.ru";
        string url3 = "http://blogs.msdn.com";
        var webRequest1 = WebRequest.Create(url1);
        var ar1 = webRequest1.BeginGetResponse(ar =>
            {
                var webResponse = webRequest1.EndGetResponse(ar);
                Console.WriteLine("{0}: {1}, elapsed {2}ms", webRequest1.RequestUri,
                        webResponse.ContentLength, sw.ElapsedMilliseconds);
            }, null);

        var webRequest2 = WebRequest.Create(url2);
        var ar2 = webRequest2.BeginGetResponse(ar =>
        {
            var webResponse = webRequest2.EndGetResponse(ar);
            Console.WriteLine("{0}: {1}, elapsed {2}ms", webRequest2.RequestUri,
                        webResponse.ContentLength, sw.ElapsedMilliseconds);
        }, null);

        var webRequest3 = WebRequest.Create(url3);
        var ar3 = webRequest3.BeginGetResponse(ar =>
        {
            var webResponse = webRequest3.EndGetResponse(ar);
            Console.WriteLine("{0}: {1}, elapsed {2}ms", webRequest3.RequestUri,
                webResponse.ContentLength, sw.ElapsedMilliseconds);
        }, null);

        // Этим "хитрым" способом мы обходим проблему вызова метода WaitHandle.WaitAll
        // с STAThreading Appartments
        var handles = new WaitHandle[] {ar1.AsyncWaitHandle,
            ar2.AsyncWaitHandle, ar3.AsyncWaitHandle};
        foreach(var myWaitHandle in handles)
            WaitHandle.WaitAny(new WaitHandle[]{myWaitHandle});           
    }

    С помощью анонимных функций мы устранили не только дополнительную функцию, но также необходимость передавать нужный экземпляр WebRequest через AsyncState. Но все еще осталась проблема с тем, что операция BeginXXX и EndXXX выполняются в разных потоках с разными стеками, мы не можем воспользоваться такими удобными и привычными вещами, как блоками try/catch/finally, lock или using. Только представьте себе, насколько усложнится приведенный выше код, если в него добавить обработку ошибок, закрытие объектов WebRequest или еще одну асинхронную операцию, использующую управляемый ресурс.

    Анатомия AsyncEnumerator

    Язык C# содержит синтаксический сахар, который позволяет несколько упростить нашу задачу. Давайте рассмотрим следующий псевдокод:

    static IEnumerable<int> DraftSolution(/*...*/)
    {
        var sw = Stopwatch.StartNew();
        string url = "http://rsdn.ru";
        var webRequest = WebRequest.Create(url);
               
        // Предполагаем, что мы знаем, как получить функцию обратного вызова,
        // которая будет вызвана по завершению операции
        AsyncCallback asyncCallback = null;
               
        // Инициируем запрос асинхронной операции
        var ar = webRequest.BeginGetResponse(asyncCallback, null);
               
        // Возвращаем управление вызывающему коду
        // и ожидаем завершения одной операции ввода-вывода
        yield return 1;
               
        // Предполагаем, что мы снова получим управление уже после завершения
        // асинхронной операции
        var webResponse = webRequest.EndGetResponse(ar);
        Console.WriteLine("{0}: {1}, elapsed {2}ms", webRequest.RequestUri,
            webResponse.ContentLength, sw.ElapsedMilliseconds);
    }

    Блоки итераторов (Iterator block) разворачиваются в конечный автомат и каждый блок кода между операторами yield return выполняются после вызова методов MoveNext итератора, а все локальные переменные превращаются в поля сгенерированного класса (более подробно внутреннее устройство итераторов я рассматривал ранее: части 1, 2, 3). Блоки итераторов содержат ряд ограничений, например, мы не можем их использовать в анонимных методах, не можем использовать out и ref параметры, мы не можем использовать yield return из блоков catch или finally. Несмотря на это, использование асинхронных операций совместно с блоками итераторов позволит использовать блоки try/finally и, соответственно, такие конструкции как using или lock.

    Реализация класса AsyncEnumerator, на плечи которого и будет возлагаться вся оставшаяся функциональность, достаточно проста, класс содержит всего несколько методов и в простом виде может быть реализован за полчаса. Для начала давайте рассмотрим, из чего он состоит:

    // Класс, упрощающий работу с асинхронными операциями
    public sealed class AsyncEnumerator2
    {
        // Возвращает функцию обратного вызова, которую нужно передать
        // методу BeginXXX
        public AsyncCallback GetEndCallback();

        // Выполнение асинхронной операции.
        // Функция вернет управление только после завершения всех операций.
        public void Execute(IEnumerator<int> enumerator);
           
        // Функция вызывается при завершении асинхронной операции
        private void EndCallback(IAsyncResult ar);

        // Переходит к очередному блоку кода внутри блока итератора
        private void ResumeIterator();
    }

    Метод Execute принимает в качестве параметра IEnumerable<int>, каждый элемент которого указывает на количество асинхронных операций, выполнение которых нужно дождаться перед переходом к следующему коду внутри блока итератора. Этот метод не возвращает управление до тех пор, пока все асинхронные операции не будут выполнены. Основная же работа ложится на плечи метода ResumeIterator, который как раз и получает каждый элемент коллекции и не переходит к следующему элементу, пока не будет выполнено запрошенное количество асинхронных операций. Метод EndCallback вызывается при завершении асинхронной операции, увеличивает количество уже завершенных операций и вызывает ResumeIterator, если количество завершенных операций соответствует текущему элементу енумератора.

    // Переходит к очередному блоку кода внутри блока итератора
    private void ResumeIterator()
    {
        bool iterationFinished;
        while (iterationFinished = enumerator.MoveNext())
        {
            // Получаем текущее количество асинхронных операций
            int current = enumerator.Current;
            lock (numbersSynchandle)
            {
                this.numberOfFinishedOperations = 0;
                if (current > numberOfFinishedOperations)
                {
                    // "Запоминаем" текущее количество запрошенных операций,
                    // чтобы перейти к следующему коду внутри блока итератора
                    // только при завершении данного количества асинхронных операций
                    this.numberOfPendingOperations = current;

                    // Еще недостаточно завершенных асинхронных операций,
                    // чтобы переходить к следующему блоку внутри блока итераторов
                    break;
                }
            }
        }

        // Выходим из функции и ожидаем ее последующего вызова
        // после завершения необходимого количества операций
        if (iterationFinished)
            return;

        // Мы получили все элементы блока итератора.
        // Вызываем очистку ресурсов.
        enumerator.Dispose();

        // "Говорим" методу Excecute о том, что выполнение всех операций
        // завершено
        waitFinishedEvent.Set();
    }

    private void EndCallback(IAsyncResult ar)
    {
        bool needResume;
        lock (numbersSynchandle)
        {
            numberOfFinishedOperations++;
            // Возобновляем выполнение итератора, если
            // завершено достаточное количество асинхронных операций
            needResume = numberOfPendingOperations <= numberOfFinishedOperations;
        }
        if (needResume)
            ResumeIterator();
    }

    В приведенной реализации я сознательно не добавлял обработку ошибок и использовал наиболее простые способы синхронизации,  чтобы простота идеи не была скрыта за этими деталями. Теперь зная о реализации, давайте рассмотрим более сложный пример использования асинхронных операций, а этот же вариант, но с использованием стандартного механизма APM, как это обычно, оставим на совесть любознательных читателей (хотя я думаю, что любой читатель с минимальным опытом согласится, что вряд ли он будет таким же простым).

    static IEnumerator<int> UsingAsyncEnumerator(AsyncEnumerator ae)
    {
        var sw = Stopwatch.StartNew();
        var urls = new string[] {
                    "http://rsdn.ru",
                    "http://gotdotnet.ru",
                    "http://blogs.msdn.com",};
        // Нужно обязательно вызвать ToList, поскольку в противном случае
        // ленивость query comprehensions не приведет к вызову методов
        // BeginGetResponse
        var webRequests = (from url in urls
                            let webRequest = WebRequest.Create(url)
                            let asyncResult = webRequest.BeginGetResponse(ae.GetEndCallback(), null)
                            select new {Url = url, WebRequest = webRequest,
                                AsyncResult = asyncResult}).ToList();
        // Ожидаем выполнения всех запросов
        yield return urls.Length;
               
        var sb = new StringBuilder();
        foreach (var wr in webRequests)
        {
            var webResponse = wr.WebRequest.EndGetResponse(wr.AsyncResult);
            var text = string.Format("{0}: {1}, elapsed {2}ms", wr.Url,
                webResponse.ContentLength, sw.ElapsedMilliseconds);
            Console.WriteLine(text);
            sb.AppendLine(text);
        }
                       
        var outputText = sb.ToString();
        using (var fs = new FileStream("d:\\results.txt", FileMode.Create,
            FileAccess.Write, FileShare.Read))
        {
            var ar = fs.BeginWrite(UnicodeEncoding.Default.GetBytes(outputText), 0,
                UnicodeEncoding.Default.GetByteCount(outputText), ae.GetEndCallback(), null);
                   
            // Ожидаем завершения всего одной операции
            yield return 1;
            fs.EndWrite(ar);
            Console.WriteLine("Saving to file finished");
        }
    }
    //...
    var ae = new AsyncEnumerator();
    ae.Execute(UsingAsyncEnumerator(ae));

    В приведенном примере не просто используется несколько асинхронных операций: результаты набора одних операций элегантным образом используются в другой операции, которая, к тому же, использует конструкцию using языка C#. Использование класса AsyncEnumerator позволяет пользоваться всеми преимуществами синтаксиса синхронных операций, тем не менее, выполняя их асинхронно.

    Данная реализация класса AsyncEnumerator очень простая, она не содержит обработки ошибок и большого количества функционала, который присутствует в оригинальной библиотеке PowerThreading. Так, например, библиотека Джеффри Рихтера и компании поддерживает отмену операций, контексты синхронизации, содержит асинхронную версию метода Execute (в виде BeginExecute и EndExecute) и многое другое. Но зная основные принципы, на которых построена эта библиотека, разобраться с оставшимся функционалом не составит большого труда.

    В следующий раз: чем может помочь Reactive Extensions в вопросах работы с асинхронными операциями.

    8 комментариев:

    1. Ваш код про LambdaBasedApm не работает...
      Не могу понять как главный поток будет ждать дочерние потоки.

      http://rsdn.ru/: 1672, elapsed 81ms
      http://gotdotnet.ru/: 81782, elapsed 336ms
      Все скачали
      http://blogs.msdn.com/: 47784, elapsed 617ms

      ОтветитьУдалить
    2. Ну программа примерно такя
      static void Main(string[] args)
      {
      LambdaBasedApm();
      Console.WriteLine("Все скачали");
      Console.ReadLine();
      }

      ОтветитьУдалить
    3. Ну, дело не в том, что этот код не работает. Я намеренно убрал из этого метода код, ожидающий завершения (хотя вомозможно и зря). У меня в реальном тестовом примере используется такой код в самом конце функции LambdaBasedApm:


      // Этим "хитрым" способом мы обходим проблему вызова метода WaitHandle.WaitAll
      // с STAThreading Appartments
      var handles = new WaitHandle[] {ar1.AsyncWaitHandle,
      ar2.AsyncWaitHandle, ar3.AsyncWaitHandle};
      foreach(var myWaitHandle in handles)
      WaitHandle.WaitAny(new WaitHandle[]{myWaitHandle});

      ОтветитьУдалить
    4. Ну так этот код есть, но нету кода который дает сиглан о завершении потока. Что будет ждать WainAny если никто Set не скажет ?

      ОтветитьУдалить
    5. Set будет дернут автоматически при завершении асинхронной операции. См. документацию на IAsyncResult.AsyncWaitHandle:
      Gets a WaitHandle that is used to wait for an asynchronous operation to complete.

      ОтветитьУдалить
    6. Наверное проблема в том, что Set возникает сразу после получения Response.
      Потом основной поток продолжает работать и выводит сообщение "Все скачали"
      И только потом выполняется лямбда, которая выводит резцультат по последнему потоку
      :(

      ОтветитьУдалить
    7. @Mik: Да, мы получаем классическую гонку, поэтому для борьбы с этим нам нужны дополнительные средства синхронизации.

      ОтветитьУдалить
    8. lock (numbersSynchandle) в ResumeIterator() должен стоять перед while (iterationFinished = enumerator.MoveNext())

      ОтветитьУдалить