воскресенье, 28 июня 2015 г.

Идиома Process Tasks By Completion

При работе с тасками часто возникает такая задача: у нас есть набор входных данных и для обработки каждого элемента используется длительная операция.

Можно подойти к этой задаче в лоб. Крутим цикл, запускаем таски, обрабатываем результаты по одному:

private Task<Weather> GetWeatherForAsync(string city)
{
   
Console.WriteLine("[{1}]: Getting the weather for '{0}'"
, city,
       
DateTime.Now.
ToLongTimeString());
   
return WeatherService.
GetWeatherAsync(city);
}


[
Test]
public async Task
ProcessOneByOneNaive()
{
   
var cities = new List<string> { "Moscow", "Seattle", "New York"
};

   
var tasks =
        from city in
cities
       
select new { City = city, WeatherTask =
GetWeatherForAsync(city) };

   
foreach (var entry in
tasks)
    {
       
var wheather = await entry.
WeatherTask;

        ProcessWeather(entry
.
City, wheather);
    }
}

private void ProcessWeather(string city, Weather
weather)
{
   
Console.WriteLine("[{2}]: Processing weather for '{0}': '{1}'"
, city, weather,
       
DateTime.Now.ToLongTimeString());
}

Здесь мы обращаемся к некоторому сервису погоды, для получения температуры в каждом городе, затем обрабатываем полученные результаты путем вывода города и температуры на экран.

Подход рабочий, но есть одна проблема: новая задача будет запущена лишь после завершения предыдущей. Тут можно принудительно дернуть все задачи и вызвать ToList() на LINQ-запросе, но и в этом случае задачи будут обрабатываться в порядке «городов», а не в порядке доступности результатов (предположим, что для получения погоды для первого города уйдет втрое больше времени, чем для других; в этом случае мы будем ждать результат по первому городу, хотя результаты по двум другим уже доступны).

Решение заключается в использовании идиомы Process Tasks by Completion (можете называть это паттерном, если хотите), которая заключается в следующем: задачи должны обрабатываться не в порядке их запуска, а в порядке их завершения.

Вот как это будет выглядеть:

[Test]
public async Task
ManualProcessByCompletion()
{
   
var cities = new List<string> { "Moscow", "Seattle", "New York"
};
   
var tasks = (from city in
cities
              
let result = new { City = city, WeatherTask =
GetWeatherForAsync(city) }
              
select TaskEx.FromTask(result, r => r.WeatherTask)).
ToList();

   
while (tasks.Count != 0
)
    {
       
var completedTask = await Task.
WhenAny(tasks);

        tasks
.
Remove(completedTask);

       
var result = completedTask.
Result;

        ProcessWeather(result
.City, result.WeatherTask.Result);
    }
}
ПРИМЕЧАНИЕ
В одном из курсов Pluralsight данная идиома называется Process tasks one by one, но, ИМХО, это еще менее понятное название, чем у меня. Так что если есть мысли, какое название будет лучше передавать ее суть – буду рад выслушать варианты!

Метод ManualProcessByCompletion, запускает получение погоды с помощью GetWeatherForAsync для всех городов, оборачивает объект анонимного типа в таску (позже объясню, почему это нужно). Затем, внутри цикла while мы зовем Task.WhenAny и получаем первую завершенную задачу. Получается, что задачи обрабатываются по мере завершения, а не по порядку их запуска. Но в данном случае, таска содержит ответ на вопрос (какая погода в городе?), но не содержит самого вопроса (имени города). Нам нужно как-то объединить результат и контекст исполнения. Для этого используется метод TaskEx.FromTask:

public static Task<T> FromTask<T, U>(T result, Func<T, Task<U>> taskSelector)
{
   
Contract.Requires(taskSelector != null
);

   
var tcs = new TaskCompletionSource<T>
();
   
var task =
taskSelector(result);

    task
.ContinueWith(t =>
    {
       
if (t.
IsFaulted)
            tcs
.SetException(t.
Exception);
       
else if (t.
IsCanceled)
            tcs
.
SetCanceled();
       
else
            tcs.
SetResult(result);
    });

   
return tcs.Task;
}

Метод TaskEx.FromResult, создает прокси-таску, которая завершится при завершении оригинальной задачи. А делегат taskSelector позволяет «извлечь» задачу из основного объекта, что позволяет удобно использовать этот подход совместно с анонимными типами.

Новый подход работает лучше оригинального, но выглядит более громоздким. Есть смысл сделать небольшую обертку, которая позволит использовать его повторно.

public static IEnumerable<Task<TElement>> OrderByCompletion<TElement, TTaskResult>(
   
this IEnumerable<TElement> sequence, Func<TElement, Task<TTaskResult>>
taskSelector)
{
   
Contract.Requires(sequence != null
);
   
Contract.Requires(taskSelector != null
);

   
var tasks = (from element in
sequence
               
let pair = new {Element = element, Task =
taskSelector(element)}
               
select FromTask(pair, p => p.Task)).
ToList();
                       
   
while (tasks.Count != 0
)
    {
       
var tcs = new TaskCompletionSource<TElement>
();

       
// Getting the first finished task
        Task.WhenAny(tasks).ContinueWith(tsk =>
        {
           
var finishedTask = tsk.
Result;
            tasks
.
Remove(finishedTask);

            tcs
.FromTask(finishedTask, arg => arg.
Element);
        });

       
yield return tcs.Task;
    }
}

Полный код класса доступен на гитхабе, но смысл его такой. Метод преобразует последовательность задач в другую последовательность задач, порядок которых определяется порядком завершения задач, а не порядком в исходной последовательности.

И вот как выглядит пример использования:

[Test]
public async Task
ProcessByCompletion()
{
   
var cities = new List<string> { "Moscow", "Seattle", "New York"
};
           
   
var tasks =
        from city in
cities
       
select new {City = city, WeatherTask =
GetWeatherForAsync(city)};

   
foreach (var task in tasks.OrderByCompletion(t => t.
WeatherTask))
    {
       
var taskResult = await
task;

       
// taskResult is an object of anonymous type with City and WeatherTask
        ProcessWeather(taskResult.City, taskResult.WeatherTask.Result);
    }
}

Мы снова вернулись к исходному варианту с точки зрения синтаксиса, но оставили новое поведение. Вот результат исполнения, который показывает, что все задачи запускаются одновременно, а обработка происходит по мере поступления результатов:

[12:54:35 PM]: Getting the weather for 'Moscow'
[12:54:35 PM]: Getting the weather for 'Seattle'
[12:54:35 PM]: Getting the weather for 'New York'
[12:54:36 PM]: Processing weather for 'Seattle': 'Temp: 7C'
Got the weather for 'Moscow'
[12:54:39 PM]: Processing weather for 'Moscow': 'Temp: 6C'
Got the weather for 'New York'
[12:54:40 PM]: Processing weather for 'New York': 'Temp: 8C'

UPDATE:

Отказ от Query Comprehension Syntax-а позволит упростить последний пример еще немного:

[Test]
public async Task
ProcessByCompletion()
{
   
var cities = new List<string> { "Moscow", "Seattle", "New York"
};

   
var tasks = cities.Select(async city =>
    {
       
return new {City = city, Weather = await
GetWeatherForAsync(city)};
    });

   
foreach (var task in tasks.
OrderByCompletion())
    {
       
var taskResult = await
task;

       
// taskResult is an object of anonymous type with City and WeatherTask
        ProcessWeather(taskResult.City, taskResult.Weather);
    }
}

И вариант, предложенный @hazzik на основе Rx-ов:

[Test]
public void
ProcessOneUsingRx()
{
   
var cities = new[] { "Moscow", "Seattle", "New York"
};
   
var objs = cities.Select(async city => new
    {
        City
=
city,
        Weather
= await
GetWeatherForAsync(city)
    })
.Select(task => task.ToObservable()).Merge().
ToEnumerable();

   
foreach (var obj in
objs)
    {
        ProcessWeather(obj
.City, obj.Weather);
    }
}

Он работает следующим образом: вначале мы берем последовательность задач, конвертим ее в последовательность IEnumerable<IObservable<T>>, которая, затем мерджится в одну последовательность IObservable<T>.

Особенность этого подхода в том, что в данном случае вызов MoveNext в цикле foreach является блокирующим.

З.Ы. Код залит в новый репо на гитхабе - https://github.com/SergeyTeplyakov/TplTipsAndTricks

26 комментариев:

  1. А где вариант "решить задачу так, как это делают все нормальные люди"?

    *(использовать IObservable/IObserver)

    ОтветитьУдалить
    Ответы
    1. Чтобы не быть голословным: https://gist.github.com/hazzik/fec7d0a80757b5f1a236

      Удалить
    2. Этот комментарий был удален автором.

      Удалить
    3. Подумал еще немного. Тут даже RX не надо. Нужно просто перенести обработку в GetWeatherForAsync и сделать Task.WaitAll / Task.WhenAll

      (edit: spelling)

      Удалить
    4. Обрабатывать прямо в методе GetWeatherForAsync не всегда удобно. Размазывает ответветственности + бывает нужно, чтобы обработка была именно в одном потоке (если идет работа с UI-м).

      Чтобы не быть голословным, камрад @KirillOsenkov привел ссылку на пост Тауба, в котором приводится решение аналогичной задачи: http://blogs.msdn.com/b/pfxteam/archive/2012/08/02/processing-tasks-as-they-complete.aspx

      Удалить
    5. Я обновил gist, там последние 2 ревизии обработка в разных методах, как в примере. Это определенно задача для RX. Конечный .ToEnumerable() / .Do() сделаны, чтобы код был похож на оригинальный.

      Удалить
    6. Кстати, я видел этот пост, и никак не могу понять: в чем же разница, что тут нового? Зачем перепечатывать его почти через 3 года?

      Удалить
    7. > Кстати, я видел этот пост, и никак не могу понять: в чем же разница, что тут нового? Зачем перепечатывать его почти через 3 года?

      Я не уверен, про какой пост идет речь, но я при подготовке своего поста на пост Тауба так и не наткнулся. К тому же, в нашей же индустрии очень мало чего-то действительно нового, если следовать такому принципу, то блоги, книги и большинство софта вообще писать не имеет смысла;)

      Удалить
    8. > Я обновил gist, там последние 2 ревизии обработка в разных методах, как в примере. Это определенно задача для RX.

      Решение с Rx-ами - очень ОК, хотя я был бы не в восторге от просмотра подобного кода каждый раз, когда нужно обрабатывать задачи по мере их окончания.

      ИМХО, большинству программистов вариант с while и WhenAny будет читабельнее. Если же Rx-овые потроха спрятать за фасадным методом, таким как OrderByCompleted или Interleaved, но тогда важность использование того или иного подхода резко снижается и вариант в рукопашную становится снова более предпочтительным, поскольку утилитный метод перестает зависеть от сторонних библиотек.

      Удалить
    9. Мне ProcessByCompletion() кажется на порядок читабельней, чем ProcessOneUsingRx(). А в реалиях нашего проекта, когда каждую срочку стороннего кода нужно апрувить через совет "архитекторов" (я сейчас про использование RX), то подход ProcessOneUsingRx() вообще нам не подходит.

      Удалить
  2. >Если же Rx-овые потроха спрятать за фасадным методом.

    Там можно написать просто .ToObservable().Merge() (Как говорил последний ToEnumerable() не нужен на самом деле)

    >большинству программистов ... будет читабельнее.

    Если мы будем следовать такой логике, то мы так и останемся в .NET 2. Люди должны учить и применять новые вещи.

    >Я не уверен, про какой пост идет речь, но я при подготовке своего поста на пост Тауба так и не наткнулся.

    Он вторым (первым ссылка на его выдержку в MSDN https://msdn.microsoft.com/en-us/library/jj155756.aspx) идет по запросу "Process Tasks By Completion .NET" (.NET нужен, потому что иначе всякая фигня связанная с GTD вылазит в результатах). Так что странно.

    >К тому же, в нашей же индустрии очень мало чего-то действительно нового, если следовать такому принципу, то блоги, книги и большинство софта вообще писать не имеет смысла;)

    На самом деле появляется очень много всего нового. Но, я считаю именно так, что нет смысла писать, если нечего добавить.

    ОтветитьУдалить
    Ответы
    1. Даже комменты представленные ранее, дали мне достаточно пищи для размышления, чтобы я не считал свой труд потерей времени.

      > Он вторым (первым ссылка на его выдержку в MSDN https://msdn.microsoft.com/en-us/library/jj155756.aspx) идет по запросу "Process Tasks By Completion .NET" (.NET нужен, потому что иначе всякая фигня связанная с GTD вылазит в результатах). Так что странно.

      Проблема в том, что в черновике я использовал другое название - Process one-by-one и пробовал искать именно в этом виде, не нашел. В процессе обсуждения этого дела с @yevgen на github совместно пришли к другому названию, которое я уже не гуглил.

      > На самом деле появляется очень много всего нового. Но, я считаю именно так, что нет смысла писать, если нечего добавить.

      Еще раз, для меня самого это оказалось новым, благодаря неумению гуглить или еще чему-то. Думаю, что для кого-то другого - тоже.

      Удалить
    2. @hazzik, а я считаю, что не надо столь агресивно и резко высказывать своё мнение. Как уже правильно сказал Сергей, в нашей отрасли очень сложно написать что-то абсолютно новое. Огромное количество людей сочли этот пост скорее полезным, чем бесполезным. Так что автору спасибо, а вам "-1" за пафосное поведение!
      Если блог не кажется полезным, кажется сплошной копипастой, то просто не ходите сюда. А если уж пришли, то подобные мнения держите при себе.

      Удалить
  3. А не нужно заменять ToList() на ToDictionary(task => task.Id), что бы удаление из списка не превращалось бы в полный перебор?

    ОтветитьУдалить
    Ответы
    1. Да, это поможет, но тут проблема не столько в линейном поиске задачи, сколько в том, что на каждой итерации Task.WhenAny создает N продолжений, для определения завершенной задачи (что дает O(n^2) количество продолжений в целом).

      Я вот сюда залил версию, которая создает только N продолжений всего: https://github.com/SergeyTeplyakov/TplTipsAndTricks/blob/master/src/TplTipsAndTricks/ProcessTasksByCompletion/TaskEx.cs#L94

      Удалить
  4. Метод TaskEx.FromResult -> TaskEx.FromTask. Сереж, хотел спросить зачем ты task->tsk сокращаешь, чего выигрываешь? :). Ну и по чесноку, решение с Rx гораздо декларативнее, не говоря уже о компактности. И не знаю, стоит или нет, но посмотри на коммент в последнем примере. Приходится упоминать, что есть taskResult. C точки зрения использования(практического) ProcessByCompletion очень ок, но только если не вдаваться как внутри все устроено.

    ОтветитьУдалить
    Ответы
    1. > Метод TaskEx.FromResult -> TaskEx.FromTask.
      Пасиб!

      > хотел спросить зачем ты task->tsk сокращаешь, чего выигрываешь?
      Чтобы не конфликтовать с другим именем в этой же области видимости (видимо, конфликт был в одной из предыдущих реализаций, а имя осталось старое). Нужно поправить.

      > Ну и по чесноку, решение с Rx гораздо декларативнее, не говоря уже о компактности.
      Есть два момента: во-первых, Rx-вый вариант не эквивалентен Task-based варианту, поскольку первый дает синхронный MoveNext. Буду рад если покажешь, как сделать возможность await-ить новое значение с помощью Rx-ов в этом случае.

      Ну и второе: декларитивной является реализация, ведь OrderByCompletion все равно более декларативный вариант, чем ToObservable+Merge. Правильно?

      > C точки зрения использования(практического) ProcessByCompletion очень ок, но только если не вдаваться как внутри все устроено.
      Так у нас же все так устроено. LINQ - это большая куча фигни, спрятанной за красивой оберткой. Если обертка ведет себя хорошо, а реализация не в доску дремучая - то все ок:))

      Удалить
    2. >Есть два момента: во-первых, Rx-вый вариант не эквивалентен Task-based варианту, поскольку первый дает синхронный MoveNext. Буду рад если покажешь, как сделать возможность await-ить новое значение с помощью Rx-ов в этом случае.

      Если честно, то я не понял что тут написано.

      Могу сказать, что внешнее поведение Rx и ProcessByCompletion абсолютно одинаковое.

      Удалить
    3. В случае с тасками, сам метод ProcessByCompletion возвращает Task и не будет заблокирован при ожидании нового результата. Метод MoveNext, который будет дернут внутри foreach-а сразу же вернет новую task-у, которую я внутри цикла уже await-аю.

      В случае с Rx-ами сам метод у меня возвращает void (в примере на гисте он все еще возвращает task-у, но смысла в этом нет, поскольку внутри метода нет await-а). В этом случае метод MoveNext внутри foreach-а будет заблокирован до момента прихода результатов первой таски.

      Это мелочь, но интересно было бы получить аналогичное поведение, как и в случае с тасками.

      Удалить
    4. Ок, понял.

      Оно заблокированно потому что ToEnumerable() материализует RX-запрос слишком поздно. Можно добавит ToList()/ToArray/AsEnumerable где-то перед Merge, но после Select (который с асинхронной лямбдой), а ToEnumerable перенести ближе к foreach.

      .Select(...).ToArray().ToObservable().Merge()
      .Select(...).Select(t => t.ToObservable()).ToArray().Merge()

      Либо можно делать Do, и тогда все пучком.

      Обновленный гист: https://gist.github.com/hazzik/fec7d0a80757b5f1a236

      Удалить
  5. Хорошая штука, в хозяйстве пригодится. Описывай таких паттернов/трюков ещё, если наткнёшься, даже если они могут быть давно известны, хотя бы потому что твоё описание может понятнее получиться.

    ОтветитьУдалить
    Ответы
    1. Пасибки! Есть еще в копилке несколько. Буду описывать:))

      Удалить
  6. Сергей, почему бы коду, решающему исходную "задачу" не выглядеть вот так:

    var weathers = cities.SelectParallelly(GetWeather);

    foreach (var w in waethers.WithForcedParllelismOf(ConnectionCount))
    {
    Console.WriteLine(w);
    }

    Наивная реализация могла бы фетчить элементы из энумератора, в нескольких потоках делать долгое действие и складывать результаты в BlockingCollection, откуда бы они вытягивались "опустошающим" энумератором.

    По-моему код в 500 раз более сконценрирован на прикладной задаче.

    Я нуб, будьте снисходительны.

    ОтветитьУдалить
    Ответы
    1. Андрей, предложенное решение вполне рабочее, но есть небольшая разница в поведении: если WithForcedParallelismOf будет возвращать GetConsumingEnumerable, то текущий поток будет блокирован до тех пор, пока результат не будет получен. В моем же варианте возвращаются именно таски, но они будут завершаться в нужном порядке.

      > По-моему код в 500 раз более сконценрирован на прикладной задаче.

      Я тут уже пару раз писал, еще раз подчеркну: сложность скрытая за простым фасадом не увеличивает сложность всего решения (до тех пор, пока построенная абстракция не даст течь). Поэтому если утилитный метод сложнее, но дает больше гарантий, то вполне нормально использовать его. Ведь в современных библиотеках уже сосредоточена сложность, с которой не сможет справитья ни один живущий на свете программист, но это никого не напрягает.

      Удалить
  7. >> Отказ от Query Comprehension Syntax-а позволит упростить последний пример еще немного:
    Как я понимаю, тут не немного:
    1. OrderByCompletion будет расширять IEnumerable> а не IEnumerable. И это семантически верно.
    2. Из OrderByCompletion выкинут параметр taskSelector
    3. FromTask выкинут

    ОтветитьУдалить
  8. Сергей, спасибо, статья безусловно полезная. Но раз уж зашла речь о том, был ли подход ранее описан, то, наверное, да. Например, в этой книге http://www.apress.com/9781430259206 на странице 156. Заголовок - Improved when any. Подход 1 в 1 как у Тауба и не страдает проблемой O(n^2) продолжений.

    ОтветитьУдалить