четверг, 2 декабря 2010 г.

Знакомство с асинхронными операциями в C# 5

В прошлый раз мы говорили о том, как можно упростить работу с асинхронными операциями с помощью класса AsyncEnumerator Джеффри Рихтера, а также с помощью библиотеки Reactive Extensions. Однако, как поведал нам недавно Хейлсберг с компанией, счастье таки наступит с выходом новой версии языка C# и нам не понадобится использовать сторонние и не очень велосипеды, при работе с асинхронными операциями.

Идея, которая лежит в основе асинхронных операций в C# 5, очень похожа на ту, которую использовал Джеффри Рихтер в своем классе AsyncEnumerator, только на этот раз помимо нас с вами и старины Рихтера о ней еще узнал и компилятор (что здорово сказывается на вкусовых качествах сего синтаксического сахарка).  Для начала, давайте вернемся к синхронной версии нашего гениального примера, который мы использовали ранее:

static void SyncVersion()
{
    Stopwatch sw = Stopwatch.StartNew();
    string url1 = "http://rsdn.ru";
    string url2 = "http://gotdotnet.ru";
    string url3 = "http://blogs.msdn.com";
    var webRequest1 = WebRequest.Create(url1);
    var webResponse1 = webRequest1.GetResponse();
    Console.WriteLine("{0} : {1}, elapsed {2}ms", url1,
        webResponse1.ContentLength, sw.ElapsedMilliseconds);

    var webRequest2 = WebRequest.Create(url2);
    var webResponse2 = webRequest2.GetResponse();
    Console.WriteLine("{0} : {1}, elapsed {2}ms", url2,
        webResponse2.ContentLength, sw.ElapsedMilliseconds);

    var webRequest3 = WebRequest.Create(url3);
    var webResponse3 = webRequest3.GetResponse();
    Console.WriteLine("{0} : {1}, elapsed {2}ms", url3,
        webResponse3.ContentLength, sw.ElapsedMilliseconds);
}

Здесь все очень просто (и главное весьма оригинально!): мы прикидываемся, что мы пишем что-то типа своего браузера, ну или просто нам делать нечего и нужно получить содержимое трех веб-страниц. Синхронная версия, как всегда замечательно работает, за исключением того, что мы ждем втрое больше времени, нежели могли бы, запустив эту операцию асинхронно. Поскольку, мы уже поняли, что это плохо и с удовольствием посмотрели выступление Хейлсберга, то попробуем прямо здесь, так же как и он, двумя росчерками мышки и тремя нажатиями на клаву, сделать из этого замечательного синхронного метода… асинхронный.

Первое, что нужно сделать, это изменить объявление нашей функции на следующее:

static async void AsyncVersion()

Ключевое слово async (которое в CTP версии является ключевым словом, а в релизе будет контекстным ключевым словом) в сигнатуре метода говорит о том, что этот метод выполняется асинхронно и возвращает управление вызывающему коду сразу после начала некоторой асинхронной операции. «Асинхронные методы» могут возвращать один из трех типов возвращаемого значения: void, Task и Task<T>. Если метод возвращает void, то это будет асинхронная операция типа: «запустили и забыли» («fire and forget»), поскольку обработать результат этой операции будет невозможно (хотя если не обработать исключения внутри этого метода, то грохнется он славно, и вполне может потянуть за собой и весь процесс!). В некоторых сценариях это бывает полезным (например, мы захотим асинхронно уведомить всех удаленных подписчиков и нам плевать на то, получат они эти сообщения или нет). С классами Task и Task<T> любознательный читатель (которому еще окончательно не надоели темпы, с которыми всеми нами любимая компания из Редмонда выпускает новые фичи), может быть знаком, поскольку они уже некоторое время живут и здравствуют в .Net Framework 4-й версии. Основная идея этих классов заключается в том, что они в себе инкапсулируют «незавершенную задачу» и мы можем дождаться ее завершения, установить «продолжение» (нечто, что должно быть вызвано при завершении этой задачи) и т.п. При этом класс Task<T> является наследником класса Task и отличается от последнего тем, что позволяет получить возвращаемое значение типа T посредством свойства Result, в то время, как класс Task скорее говорит о том, что некоторая задача возвращает void, и ценна за счет своих побочных эффектов.

Поскольку мы не просто хотим запустить на выполнение асинхронную операцию, но и получить результаты ее выполнения, но при этом нам важен не сам результат, а побочный эффект… В общем, мы используем тип Task в качестве возвращаемого значения (хотя могли бы спокойно использовать Task<string>, но, в общем, это не столь важно).

Изменив сигнатуру метода, нужно изменить немного и его тело. Для этого строки вида:

var webResponse1 = webRequest1.GetResponse();

Нужно заменить на:

var webResponse1 = await webRequest1.GetResponseAsync();

Ключевое слово await (в релизе это тоже будет контекстным ключевым словом) совсем не значит то, что мы залипнем в этой точке кода до тех пор, пока запрошенная асинхронная операция не будет выполнена. Это может сбивать с толку (и Эрик с компанией даже предлагают подыскать другие ключевые слова), но это означает в точности противоположную идею. Ключевое слово await означает, что после начала асинхронной операции (в данном случае после начала асинхронной операции получения WebResponse) метод должен вернуть управление, а продолжить его с этого же места уже после завершения асинхронной операции. Мудрыми словами это называется продолжениями и у Эрика есть с десяток статей, способных здорово вклинить мозги по этому поводу.

Если не вдаваться в матан, то в данном случае можно провести параллель с блоками итераторов в C# (если вы не знаком с этой темой, то я очень вам рекомендую это исправить), поскольку компилятор в обоих случаях генерирует конечный автомат, который отслеживает, где было прервано выполнение метода, и используется для корректного восстановления выполнения при последующем вызове. Однако, если в случае итераторов возобновление выполнения блока итераторов происходит при последующем вызове метода MoveNext внешним кодом, то асинхронный метод продолжит выполняться автоматом после завершения асинхронной операции. Для этого, в качестве «продолжения» текущей задачи устанавливается текущий метод, который и вызывается при завершении текущей задачи.

Итак, вот весь измененный метод целиком:

static async void AsyncVersion()
{
    Stopwatch sw = Stopwatch.StartNew();
    string url1 = "http://rsdn.ru1";
    string url2 = "http://gotdotnet.ru";
    string url3 = "http://blogs.msdn.com";
   
    var webRequest1 = WebRequest.Create(url1);
    Console.WriteLine("Before webRequest1.GetResponseAsync(). Thread Id: {0}",
        Thread.CurrentThread.ManagedThreadId);

    var webResponse1 = await webRequest1.GetResponseAsync();
    Console.WriteLine("{0} : {1}, elapsed {2}ms. Thread Id: {3}", url1,
        webResponse1.ContentLength, sw.ElapsedMilliseconds,
        Thread.CurrentThread.ManagedThreadId);

    var webRequest2 = WebRequest.Create(url2);
    Console.WriteLine("Before webRequest2.GetResponseAsync(). Thread Id: {0}",
        Thread.CurrentThread.ManagedThreadId);

    var webResponse2 = await webRequest2.GetResponseAsync();
    Console.WriteLine("{0} : {1}, elapsed {2}ms. Thread Id: {3}", url2,
        webResponse2.ContentLength, sw.ElapsedMilliseconds,
        Thread.CurrentThread.ManagedThreadId);

    var webRequest3 = WebRequest.Create(url3);
    Console.WriteLine("Before webRequest3.GetResponseAsync(). Thread Id: {0}",
        Thread.CurrentThread.ManagedThreadId);
    var webResponse3 = await webRequest3.GetResponseAsync();
    Console.WriteLine("{0} : {1}, elapsed {2}ms. Thread Id: {3}", url3,
        webResponse3.ContentLength, sw.ElapsedMilliseconds,
        Thread.CurrentThread.ManagedThreadId);
}

(Если честно, то я еще изменил вывод на консоль, чтобы было видно в каком потоке выполняется метод в данный момент, но это ведь наглядности ради, так что это изменение не в счет. Но в любом случае, старина Хейлсберг таки не соврал, изменений, действительно, минимальное количество).

И вот как этот метод вызывается:

static void Main(string[] args)
{

    try
    {
        Console.WriteLine("Main thread id: {0}", Thread.CurrentThread.ManagedThreadId);
        var task = AsyncVersion();
        Console.WriteLine("Right after AsyncVersion() method call");
        //Ожидаем завершения асинхронной операции
        task.Wait();
        Console.WriteLine("Asyncronous task finished!");
       
    }
    catch(System.AggregateException e)
    {
        //Все исключения в TPL пробрасываются обернутые в AggregateException
        Console.WriteLine("AggregateException: {0}", e.InnerException.Message);
    }
    Console.ReadLine();
}

А вот результат его выполнения:

Main thread id: 10
Before webRequest1.GetResponseAsync(). Thread Id: 10
Right after AsyncVersion() method call
http://rsdn.ru : 1672, elapsed 657ms. Thread Id: 13
Before webRequest2.GetResponseAsync(). Thread Id: 13
http://gotdotnet.ru : 99470, elapsed 1915ms. Thread Id: 14
Before webRequest3.GetResponseAsync(). Thread Id: 14
http://blogs.msdn.com : 47927, elapsed 2628ms. Thread Id: 15
Asynchronous task finished!

А теперь давайте разберем подробно, что происходит внутри этого зверя. Итак, вызов метода AsyncVersion происходит в текущем потоке, и управление возвращается сразу же после первого оператора await. Но прежде чем вернуть управление вызывающему коду, метод AsyncVersion устанавливается в качестве «продолжения» текущей задачи и запоминается место, с которого нужно продолжить выполнение. Эта функция возвращает объект типа Task, для того, чтобы мы смогли дождаться завершения и проверить результаты. Затем, после завершения асинхронной операции, выполнение возобновляется с предыдущего места и, как мы видим, уже в другом потоке. Далее, этот процесс продолжается до тех пор, пока не будет завершена третья асинхронная операция, после чего метод task.Wait вернет управление, и мы увидим на консоли заветное: “Asynchronous task finished!”.

Обработка ошибок также претерпела некоторых изменений, но также весьма незначительных. Если вы уже знакомы с TPL, то узнаете знакомый класс System.AggregateExcpetion, который «собирает» все исключения и накапливает их у себя внутри. Причина этого заключается в том, что у одной задачи может быть десяток дочерних задач, каждая из которых может содержать еще несколько «подзадач», и каждое задание из этого «дерева» заданий может свалиться с собственным исключением. Именно для решения этой задачи и служит AggregateException, который содержит в себе «дерево» исключений, которое можно легко «выпрямить» с помощью метода Flatten (подробности можно почитать, например, здесь, раздел «Работа с AggregateException»). В общем, если обработка ошибок и усложнилась то не значительно, а сравнивая это с ночным кошмаром, с которым приходится сталкиваться при работе с APM, то такое «усложнение», даже проблемой назвать пальцы не повернутся.

На самом деле, этот пример не очень-то отличается от синхронного; ведь мы подряд выполняем три асинхронные операции, при этом каждая последующая операция начинается только после завершения предыдущей. Давайте попробуем переписать в новом виде наш второй пример, который одновременно получает результаты от трех веб-страниц и асинхронно записывает результат в файл:

public static async void AsyncVersion2()
{
    Stopwatch sw = Stopwatch.StartNew();
    var urls = new string[] {"http://rsdn.ru", "http://gotdotnet.ru",
        "http://blogs.msdn.com"};
    var tasks = (from url in urls
                let webRequest = WebRequest.Create(url)
                select new {Url = url, Response = webRequest.GetResponseAsync()})
                .ToList();
    var data = await TaskEx.WhenAll(tasks.Select(t=>t.Response));
    var sb = new StringBuilder();
    foreach(var s in tasks)
    {
        sb.AppendFormat("{0}: {1}, elapsed {2}ms. Thread Id: {3}", s.Url,
            s.Response.Result.ContentLength,
            sw.ElapsedMilliseconds, Thread.CurrentThread.ManagedThreadId)
            .AppendLine();
    }
    var outputText = sb.ToString();
    Console.WriteLine("Web request results: {0}", outputText);
           
    using (var fs = new FileStream("d:\\results.txt", FileMode.Create,
            FileAccess.Write, FileShare.Write))
    {
        await
        fs.WriteAsync(UnicodeEncoding.Default.GetBytes(outputText), 0,
                        UnicodeEncoding.Default.GetByteCount(outputText));
    }
           
}

Вот это уже действительно интересно! Мы получили полностью асинхронный код, но при этом он не выглядит так, будто над ним несколько ночей трудилось стадо индусов (а ведь асинхронный код подобной сложности именно так и выглядит). Он понятен и читается аналогично синхронному! (Если кто не верит, то пусть попробует переписать этот код в классическом APM стиле).

Итак, что мы имеем? Мы имеем штуку, которой действительно удобно пользоваться без боязни отстрелить себе ногу. Кроме того, даже не вдаваясь в серьезные дебри, более или менее понятно, как этим делом пользоваться (мда, хотя к именам придется привыкнуть). И самое интересное, что это далеко не вся функциональность. Я не рассказывал о контекстах синхронизации и о том, как это дело можно здорово использовать с потоком пользовательского интерфейса, о том, что такое TAP (Task-based Asynchronous Pattern), не говорил о том, что все новые асинхронные методы, которые я использовал являются методами расширения и вы можете их добавлять самостоятельно сколько угодно, да и не вдавался в дебри реализации… Но об этом всем, как-нибудь в другой раз! А сегодня, я вам предлагаю, просто скачать эту штуку и попробовать ее самостоятельно!

(Чуть не забыл, вот url, чтобы вы долго его не искали: msdn.com/vstudio/async)

9 комментариев:

  1. Сергей, а Вы не разбирались, как работает следующее (я про версию Async2) - var data = await TaskEx.WhenAll(tasks.Select(t=>t.Response));
    По идее - запустится 3 задачи и WhenAll будет ждать пока не получим для каждой response. Но кто запустит эти задачи, если глубже копнуть?

    ОтветитьУдалить
  2. 2eugene: На самом деле задачи запускаются не в этой строке, а в предыдущей:

    var tasks = (from url in urls
    let webRequest = WebRequest.Create(url)
    select new {Url = url, Response = webRequest.GetResponseAsync()})
    .ToList();
    И запустит их метод ToList, поскольку этот query comprehension выполняется отложенно (на самом деле он выполняется либо при переборе элементов или при вызове некоторых методов, например, метода ToList()).
    Метод TaskEx.WhenAll всего лишь принимает набор задач и создает еще одну задачу, которая будет выполнена только тогда, когда все задачи будут, в свою чередь, завершены.
    З.Ы. Если можно на ты.
    З.Ы.Ы. Надеюсь понятно (хотя и несколько сумбурно), но если что - разъясню подродбнее.

    ОтветитьУдалить
  3. Ок. Возможно это я несколько сумбурно сформулировал вопрос. Было интересно не кто их запустит, а как WhenAll задача дождется окончания запущенных задач. Ну да ладно. Сам пока не пробовал, но интересно, что будет с локальными переменными после вызова await? Поясню на пальцах :)
    int someVariable = 1;
    ...
    await Foo();
    ...
    someVariable- ?
    Я к чему, судя по тому что написано, awaitFoo() прерывает выполнение функции. После того, как асинхронный метод будет выполнен в псевдопоследовательном режиме сохранятся ли значения локальных переменных и если да( а я думаю да) за счет чего это достигается?

    ОтветитьУдалить
  4. Локальная переменная someVariable перестанет находится в стеке, она станет полем сгенерированного класса.

    Аналогично происходит и в блоке итераторов и в замыканиях.

    Вот здесь я описывал процесс преобразования кода в случае замыканий и можно считать, что нечто подобное будет и в этом случае.

    ОтветитьУдалить
  5. А, тогда понятно. Все как в замыканиях с формированием классов.

    ОтветитьУдалить
  6. Кстати, у Эрика была отличная статья (вот она же на русском), в которой он рассказал о том, когда значимые типы будут расположены в стеке, а когда в куче.

    ОтветитьУдалить
  7. Всё это оч забавно, только непонятно, чем оно лучше старых добрых Threads??
    Пишем:
    код1
    new Thread(код в треде).Start()
    код2
    Всё ясно и не требует никаких новых ключевых слов. ЗАЧЕМ изобретать никому не нужный сахер?

    ОтветитьУдалить
  8. Есть хорошая статья у Sacha Barber'а
    http://www.codeproject.com/Articles/152765/Task-Parallel-Library-of-n
    В разделе Threads Versus Tasks приведена простая программа, благодаря которой можно убедиться, что использование Task'ов более выгодно, чем Thread

    It can be seen that even in this little experiment, the creation time of Threads to do the same job as the Tasks is far greater. This is undoubtedly down to the work that has to be done to create and manage a classic Thread. Like I say, most classic threading developers would use the ThreadPool which would give better results, but like I also said in the introduction, the classic ThreadPool has its limitations, these limitations are all taken care of with TPL.

    ОтветитьУдалить
  9. Опечатка. Вместо
    static async void AsyncVersion()
    должно быть
    static async Task AsyncVersion()

    ОтветитьУдалить