вторник, 7 июля 2015 г.

Идиома ForEachAsync

Давайте продолжим рассматривать приемы, которые будут полезными при работе с TPL.

В прошлый раз мы рассмотрели идиому, которая позволяет обрабатывать результаты в порядке окончания работы задач, а не в порядке их запуска. Но там был пропущен один интересный момент. Вот, например, у нас есть все тот же сервис погоды и желание получить результаты по всем городам как можно быстрее. Означает ли это, что можно взять все города мира и послать одновременно тысячи запросов? Сервис погоды может посчитать, что клиент сошел с ума и попробует затроттлить (throttle) запросы, превышающие определенный лимит (кстати, этот самый троттлинг – это один большой pain in the ass для всех облачных сервисов, причем как для авторов сервисов, так и для клиентов этих самых сервисов).

Так вот, нам нужно как-то ограничить число таких запросов или каких-то других асинхронных операций.

Вообще, с ограничением числа задач есть одна небольшая беда. В случае CPU Intensive операций (числодробилки, операции, которые нагружают CPU/GPU) есть простая эвристика – число работающих задач должно быть ограничено числом вычислительных устройств. Но, в случае с IO Intensive операциями таких ограничений нет. Более того, нет и встроенных инструментов для контроля за числом таких операций.

ПРИМЕЧАНИЕ
Пул потоков, теоретически, может помочь в этом плане, когда в системе исполняется смесь из IO Bound и CPU Bound операций. У него есть куча эвристик, которые пытаются подобрать оптимальное число одновременно работающих для обеспечения максимальной пропускной способности (throughput) по переработке задач. Но эти эвристики мало чем помогут, когда у нас есть большое число длительных IO Bound операций и только мы знаем, когда наш backend начнет отпадать от перегрузки. Если интересно, то есть просто замечательная статья по устройству пула потоков в .NET: Throttling Concurrency in the CLR 4.0 ThreadPool.

Итак, нам нужен метод, например, ForEachAsync, который берет последовательность элементов и фабричный метод для запуска задач. Ну и он же сможет ограничивать число одновременных операций.

public static IEnumerable<Task<TTask>> ForEachAsync<TItem, TTask>(
   
this IEnumerable<TItem> source, Func<TItem, Task<TTask>> selector
,
   
int degreeOfParallelism
)
{
   
Contract.Requires(source != null
);
   
Contract.Requires(selector != null
);

   
// Материализируем последовательность
    var tasks = source.ToList
();

   
int completedTask = -1
;

   
// Массив TaskCompletionSource будет хранить результаты всех операций
    var taskCompletions = new TaskCompletionSource<TTask>[tasks.Count
];
   
for (int n = 0; n < taskCompletions.Length; n++
)
       
taskCompletions[n] = new TaskCompletionSource<TTask>
();

   
// Partitioner сделает за нас всю черную работу по ограничению
    // числа одновременных обработчиков
    foreach (var partition in
        Partitioner.Create(tasks).GetPartitions(degreeOfParallelism
))
    {
       
var p = partition
;

       
// Теряем контекст синхронизации и запускаем обработку
        // каждой партиции асинхронно
        Task.Run(async () =>
        {
           
while (p.MoveNext
())
            {
               
var task = selector(p.Current
);

               
// Хитрым образом подавляем исключения
                await task.ContinueWith(_ =>
{ });

               
int finishedTaskIndex = Interlocked.Increment(ref completedTask
);
               
taskCompletions[finishedTaskIndex].FromTask(task
);
            }
        });
    }

   
return taskCompletions.Select(tcs => tcs.Task);
}

Выглядит немного жутковато, но, не все так плохо!

Существует Over 9000 способов ограничить число одновременных операций. И первым в этом списке идет семафор. С ним все хорошо, и его можно использовать для этой задачи, но можно взять что-то более высокоуровневое, например, класс Partitioner из TPL.

Partitioner.Create(source) возвращает объект, который умеет делить входную последовательность для параллельной обработки. Алгоритм «деления» может быть разный и зависит он от типа последовательности/коллекции, и не является интересным в нашем случае. Главное, partitioner позволяет получить несколько «итератров», которые работать могут параллельно, каждый со своим куском входной последовательности.

Параллельная обработка – это хорошо, но нам нужно где-то хранить результаты. Для этого, в начале метода идет «материализация» последовательности в список и создается массив объектов TaskCompletionSource для каждой будущей задачи. Затем, мы запускаем параллельную обработку каждой партиции, и «добавляем» результаты в массив TaskCompletionSource-ов по мере их поступления.

private Task<Weather> GetWeatherForAsync(string city)
{
   
Console.WriteLine("[{1}]: Getting the weather for '{0}'", city
,
       
DateTime.Now.ToLongTimeString
());
   
return WeatherService.GetWeatherAsync(city
);
}

[
Test]
public async Task ForEachAsync
()
{
   
var cities = new List<string> { "Moscow", "Seattle", "New York", "Kiev"
};

   
var tasks = cities.ForEachAsync(async city =>
    {
       
return new { City = city, Weather = await GetWeatherForAsync(city
) };
    },
2
);

   
foreach (var task in tasks
)
    {
       
var taskResult = await task
;

       
ProcessWeather(taskResult.City, taskResult.Weather
);
    }
}

private void ProcessWeather(string city, Weather weather
)
{
   
Console.WriteLine("[{2}]: Processing weather for '{0}': '{1}'", city, weather
,
       
DateTime.Now.ToLongTimeString());
}

И вот результаты исполнения:

[1:22:09 PM]: Getting the weather for 'Moscow'
[1:22:09 PM]: Getting the weather for 'Seattle'
-- Включилось ограничение числа задач! Ждем окончания первой задачи!
[1:22:10 PM]: Processing weather for 'Moscow': 'Temp: 6C'
-- Сразу после окончания одной задачи запускаем следующую
[1:22:10 PM]: Getting the weather for 'New York'
-- Самая новая задача завершилась первой
[1:22:15 PM]: Processing weather for 'New York': 'Temp: 8C'
-- Запускаем следующую
[1:22:15 PM]: Getting the weather for 'Kiev'
-- Только теперь завершилась вторая задача
[1:22:16 PM]: Processing weather for 'Seattle': 'Temp: 7C'
-- И теперь - последняя
[1:22:20 PM]: Processing weather for 'Kiev': 'Temp: 4C'

И в графическом виде:

clip_image002

Получается, что эта штука не просто ограничивает число одновременных операций, но и позволяет обрабатывать результаты в порядке их завершения, а не в порядке их запуска! Красота!

Что возвращать? Task или IEnumerable<Task>?

Камрад Тауб описывал реализацию ForEachAsync, аж в двух частях (Implementing a simple ForEachAsync и Implementing a simple ForEachAsync , part 2). Но его реализация несколько иная. Главное отличие ее в том, что метод ForEachAsync возвращает Task, а не IEnumerable<Task>. И это может быть весьма важным отличием:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
   
return Task.WhenAll
(
       
from partition in Partitioner.Create(source).GetPartitions(dop
)
       
select Task.Run(async delegate
        {
           
using (partition
)
               
while (partition.MoveNext
())
                   
await body(partition.Current);
        }));
}

Код короче (что хорошо!), но ведет себя по-другому (и тут, ХЗ, насколько это хорошо). Во-первых, этот подход работает только для команд, но не работает для запросов (команда – это мутатор, запрос – это геттер, подробности – в правильной книге). Во-вторых, наличие лишь одного результата сильно усложняет процесс обработки ошибок даже в случае ограничение таких операций, как сохранение данных.

Например, в данном случае обработка полностью остановится, когда исключение произойдет в каждой из существующих партиций! Так, когда произойдет первая ошибка и await body завершится с ошибкой, то цикл while оборвется, и будет вызван Dispose на объекте partition. Партиционирование реализовано с использованием идиомы work stealing, а это значит, что текущие элементы (логически) будут добавлены в очередь на обработку другой партицией. Если упадет обработка еще одной партици, то количество обработчиков еще уменьшится. И так до тех пор, пока все партиции не свалятся с ошибками. В лучшем случае вы получите лишь часть обработанных данных, а в худшем случае – проблемы с эффективностью, из-за того, что у вас будет гораздо меньше активных обработчиков задач, чем вы думали.

Решить задачу можно путем накопления ошибок и генерации AggregateException:

public static async Task ForEachAsyncWithExceptions<T>(
   
this IEnumerable<T> source, int dop, Func<T, Task> body
)
{
   
ConcurrentQueue<Exception> exceptions = null
;
           
   
await Task.WhenAll
(
       
from partition in Partitioner.Create(source).GetPartitions(dop
)
       
select Task.Run(async delegate
        {
           
using (partition
)
            {
               
while (partition.MoveNext
())
                {
                   
try
                    {
                       
await body(partition.Current
);
                    }
                   
catch (Exception e
)
                    {
                       
LazyInitializer
                           .EnsureInitialized(ref exceptions).Enqueue(e));
                    }
                }
            }
        }));
 
   
if (exceptions != null
)
    {
       
throw new AggregateException(exceptions);
    }
}

Этот подход полностью рабочий, хотя и остается вопрос с ассоциацией задачи и возникшей ошибки.

Дополнительные ссылки

9 комментариев:

  1. Нет предела совершенству. Все легче использовать и все хуже разбираться в кишках...

    ОтветитьУдалить
    Ответы
    1. Ага. Хотя из особенностей здесь только Partitioner, который можено рассматривать как коллекцию итераторов для параллельного обхода коллекции.

      Удалить
  2. Первый вариант ForEachAsync работает с запросом/геттером и возвращает коллекцию, а второй вариант ForEachAsync работает с командой/мутатором и возвращает только Task. Не правильнее было бы назвать первый вариант SelectAsync? Ведь даже один из параметров назван selector.

    ОтветитьУдалить
    Ответы
    1. Мысль. Нужно ее подумать:)

      Удалить
    2. Согласованность с существующими идиомами языка/стэка — это всегда приятно. Я, конечно, имею в виду LINQ и PLINQ, где такая операция названа именно Select. ForEach в LINQ вообще нет, но есть List.ForEach(), и он принимает именно Action, т.е. команду/мутатор.
      Более того, почему бы в качестве дальнейшего развития этой идеи не воплотить и остальные операции (WhereAsync и т.п.)? Получится нечто вроде PLINQ, но заточенное под новые реалии async/await.

      Удалить
  3. Ты хотя бы напиши, что люди должны (именно должны) использовать реализацию из RX (точнее из IX), а не пилить свою.

    ОтветитьУдалить
    Ответы
    1. А как бы выглядело решение с помощью RX?

      Удалить
  4. Вопрос:
    допустим есть последовательность из 10 элементов. Скажем Parititoner разбивает ее на 3 по 3 элемента (dop == 3). Итого запускается 3 "фоновые" операции по обработке каждой партиции. Допустим первая партиция отработала достаточно быстро - скажем за 2 секунды, а остальные две будут работать значительно дольше - скажем по 10 секунд. Получается через 2 секунды (отработала первая партиция) степень параллелизма станет уменьшится до двух и параллельных запросов погоды уже будет не три? Хуже того: а если две из трех партиций отработают за 2 секунды, а в третьей, запрос скажем на Москву, будет выполняться 10 секунд - получается остальные 2 элемента этой партиции будут ждать своей очереди выполняясь последовательно, а по логике должны продолжить выполняться параллельно dop = 3.

    Или я не правильно понимаю работу Partitioner и внутри происходит "магия"?

    ОтветитьУдалить
    Ответы
    1. Partitioner достаточно умный (точнее там есть несколько стратегий партициионирования) и по умолчанию будет происходить автоматическая баллансировка, чтобы все партициии были в работе.

      Удалить