Давайте продолжим рассматривать приемы, которые будут полезными при работе с TPL.
В прошлый раз мы рассмотрели идиому, которая позволяет обрабатывать результаты в порядке окончания работы задач, а не в порядке их запуска. Но там был пропущен один интересный момент. Вот, например, у нас есть все тот же сервис погоды и желание получить результаты по всем городам как можно быстрее. Означает ли это, что можно взять все города мира и послать одновременно тысячи запросов? Сервис погоды может посчитать, что клиент сошел с ума и попробует затроттлить (throttle) запросы, превышающие определенный лимит (кстати, этот самый троттлинг – это один большой pain in the ass для всех облачных сервисов, причем как для авторов сервисов, так и для клиентов этих самых сервисов).
Так вот, нам нужно как-то ограничить число таких запросов или каких-то других асинхронных операций.
Вообще, с ограничением числа задач есть одна небольшая беда. В случае CPU Intensive операций (числодробилки, операции, которые нагружают CPU/GPU) есть простая эвристика – число работающих задач должно быть ограничено числом вычислительных устройств. Но, в случае с IO Intensive операциями таких ограничений нет. Более того, нет и встроенных инструментов для контроля за числом таких операций.
ПРИМЕЧАНИЕ
Пул потоков, теоретически, может помочь в этом плане, когда в системе исполняется смесь из IO Bound и CPU Bound операций. У него есть куча эвристик, которые пытаются подобрать оптимальное число одновременно работающих для обеспечения максимальной пропускной способности (throughput) по переработке задач. Но эти эвристики мало чем помогут, когда у нас есть большое число длительных IO Bound операций и только мы знаем, когда наш backend начнет отпадать от перегрузки. Если интересно, то есть просто замечательная статья по устройству пула потоков в .NET: Throttling Concurrency in the CLR 4.0 ThreadPool.
Итак, нам нужен метод, например, ForEachAsync, который берет последовательность элементов и фабричный метод для запуска задач. Ну и он же сможет ограничивать число одновременных операций.
public static IEnumerable<Task<TTask>> ForEachAsync<TItem, TTask>(
this IEnumerable<TItem> source, Func<TItem, Task<TTask>> selector,
int degreeOfParallelism)
{
Contract.Requires(source != null);
Contract.Requires(selector != null);
// Материализируем последовательность
var tasks = source.ToList();
int completedTask = -1;
// Массив TaskCompletionSource будет хранить результаты всех операций
var taskCompletions = new TaskCompletionSource<TTask>[tasks.Count];
for (int n = 0; n < taskCompletions.Length; n++)
taskCompletions[n] = new TaskCompletionSource<TTask>();
// Partitioner сделает за нас всю черную работу по ограничению
// числа одновременных обработчиков
foreach (var partition in
Partitioner.Create(tasks).GetPartitions(degreeOfParallelism))
{
var p = partition;
// Теряем контекст синхронизации и запускаем обработку
// каждой партиции асинхронно
Task.Run(async () =>
{
while (p.MoveNext())
{
var task = selector(p.Current);
// Хитрым образом подавляем исключения
await task.ContinueWith(_ => { });
int finishedTaskIndex = Interlocked.Increment(ref completedTask);
taskCompletions[finishedTaskIndex].FromTask(task);
}
});
}
return taskCompletions.Select(tcs => tcs.Task);
}
Выглядит немного жутковато, но, не все так плохо!
Существует Over 9000 способов ограничить число одновременных операций. И первым в этом списке идет семафор. С ним все хорошо, и его можно использовать для этой задачи, но можно взять что-то более высокоуровневое, например, класс Partitioner из TPL.
Partitioner.Create(source) возвращает объект, который умеет делить входную последовательность для параллельной обработки. Алгоритм «деления» может быть разный и зависит он от типа последовательности/коллекции, и не является интересным в нашем случае. Главное, partitioner позволяет получить несколько «итератров», которые работать могут параллельно, каждый со своим куском входной последовательности.
Параллельная обработка – это хорошо, но нам нужно где-то хранить результаты. Для этого, в начале метода идет «материализация» последовательности в список и создается массив объектов TaskCompletionSource для каждой будущей задачи. Затем, мы запускаем параллельную обработку каждой партиции, и «добавляем» результаты в массив TaskCompletionSource-ов по мере их поступления.
private Task<Weather> GetWeatherForAsync(string city)
{
Console.WriteLine("[{1}]: Getting the weather for '{0}'", city,
DateTime.Now.ToLongTimeString());
return WeatherService.GetWeatherAsync(city);
}
[Test]
public async Task ForEachAsync()
{
var cities = new List<string> { "Moscow", "Seattle", "New York", "Kiev" };
var tasks = cities.ForEachAsync(async city =>
{
return new { City = city, Weather = await GetWeatherForAsync(city) };
}, 2);
foreach (var task in tasks)
{
var taskResult = await task;
ProcessWeather(taskResult.City, taskResult.Weather);
}
}
private void ProcessWeather(string city, Weather weather)
{
Console.WriteLine("[{2}]: Processing weather for '{0}': '{1}'", city, weather,
DateTime.Now.ToLongTimeString());
}
И вот результаты исполнения:
[1:22:09 PM]: Getting the weather for 'Moscow'
[1:22:09 PM]: Getting the weather for 'Seattle'
-- Включилось ограничение числа задач! Ждем окончания первой задачи!
[1:22:10 PM]: Processing weather for 'Moscow': 'Temp: 6C'
-- Сразу после окончания одной задачи запускаем следующую
[1:22:10 PM]: Getting the weather for 'New York'
-- Самая новая задача завершилась первой
[1:22:15 PM]: Processing weather for 'New York': 'Temp: 8C'
-- Запускаем следующую
[1:22:15 PM]: Getting the weather for 'Kiev'
-- Только теперь завершилась вторая задача
[1:22:16 PM]: Processing weather for 'Seattle': 'Temp: 7C'
-- И теперь - последняя
[1:22:20 PM]: Processing weather for 'Kiev': 'Temp: 4C'
И в графическом виде:
Получается, что эта штука не просто ограничивает число одновременных операций, но и позволяет обрабатывать результаты в порядке их завершения, а не в порядке их запуска! Красота!
Что возвращать? Task или IEnumerable<Task>?
Камрад Тауб описывал реализацию ForEachAsync, аж в двух частях (Implementing a simple ForEachAsync и Implementing a simple ForEachAsync , part 2). Но его реализация несколько иная. Главное отличие ее в том, что метод ForEachAsync возвращает Task, а не IEnumerable<Task>. И это может быть весьма важным отличием:
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
Код короче (что хорошо!), но ведет себя по-другому (и тут, ХЗ, насколько это хорошо). Во-первых, этот подход работает только для команд, но не работает для запросов (команда – это мутатор, запрос – это геттер, подробности – в правильной книге). Во-вторых, наличие лишь одного результата сильно усложняет процесс обработки ошибок даже в случае ограничение таких операций, как сохранение данных.
Например, в данном случае обработка полностью остановится, когда исключение произойдет в каждой из существующих партиций! Так, когда произойдет первая ошибка и await body завершится с ошибкой, то цикл while оборвется, и будет вызван Dispose на объекте partition. Партиционирование реализовано с использованием идиомы work stealing, а это значит, что текущие элементы (логически) будут добавлены в очередь на обработку другой партицией. Если упадет обработка еще одной партици, то количество обработчиков еще уменьшится. И так до тех пор, пока все партиции не свалятся с ошибками. В лучшем случае вы получите лишь часть обработанных данных, а в худшем случае – проблемы с эффективностью, из-за того, что у вас будет гораздо меньше активных обработчиков задач, чем вы думали.
Решить задачу можно путем накопления ошибок и генерации AggregateException:
public static async Task ForEachAsyncWithExceptions<T>(
this IEnumerable<T> source, int dop, Func<T, Task> body)
{
ConcurrentQueue<Exception> exceptions = null;
await Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
{
while (partition.MoveNext())
{
try
{
await body(partition.Current);
}
catch (Exception e)
{
LazyInitializer
.EnsureInitialized(ref exceptions).Enqueue(e));
}
}
}
}));
if (exceptions != null)
{
throw new AggregateException(exceptions);
}
}
Этот подход полностью рабочий, хотя и остается вопрос с ассоциацией задачи и возникшей ошибки.
Нет предела совершенству. Все легче использовать и все хуже разбираться в кишках...
ОтветитьУдалитьАга. Хотя из особенностей здесь только Partitioner, который можено рассматривать как коллекцию итераторов для параллельного обхода коллекции.
УдалитьProgramming Stuff: Идиома Foreachasync >>>>> Download Now
Удалить>>>>> Download Full
Programming Stuff: Идиома Foreachasync >>>>> Download LINK
>>>>> Download Now
Programming Stuff: Идиома Foreachasync >>>>> Download Full
>>>>> Download LINK C8
Первый вариант ForEachAsync работает с запросом/геттером и возвращает коллекцию, а второй вариант ForEachAsync работает с командой/мутатором и возвращает только Task. Не правильнее было бы назвать первый вариант SelectAsync? Ведь даже один из параметров назван selector.
ОтветитьУдалитьМысль. Нужно ее подумать:)
УдалитьСогласованность с существующими идиомами языка/стэка — это всегда приятно. Я, конечно, имею в виду LINQ и PLINQ, где такая операция названа именно Select. ForEach в LINQ вообще нет, но есть List.ForEach(), и он принимает именно Action, т.е. команду/мутатор.
УдалитьБолее того, почему бы в качестве дальнейшего развития этой идеи не воплотить и остальные операции (WhereAsync и т.п.)? Получится нечто вроде PLINQ, но заточенное под новые реалии async/await.
Ты хотя бы напиши, что люди должны (именно должны) использовать реализацию из RX (точнее из IX), а не пилить свою.
ОтветитьУдалитьА как бы выглядело решение с помощью RX?
УдалитьВопрос:
ОтветитьУдалитьдопустим есть последовательность из 10 элементов. Скажем Parititoner разбивает ее на 3 по 3 элемента (dop == 3). Итого запускается 3 "фоновые" операции по обработке каждой партиции. Допустим первая партиция отработала достаточно быстро - скажем за 2 секунды, а остальные две будут работать значительно дольше - скажем по 10 секунд. Получается через 2 секунды (отработала первая партиция) степень параллелизма станет уменьшится до двух и параллельных запросов погоды уже будет не три? Хуже того: а если две из трех партиций отработают за 2 секунды, а в третьей, запрос скажем на Москву, будет выполняться 10 секунд - получается остальные 2 элемента этой партиции будут ждать своей очереди выполняясь последовательно, а по логике должны продолжить выполняться параллельно dop = 3.
Или я не правильно понимаю работу Partitioner и внутри происходит "магия"?
Partitioner достаточно умный (точнее там есть несколько стратегий партициионирования) и по умолчанию будет происходить автоматическая баллансировка, чтобы все партициии были в работе.
УдалитьProgramming Stuff: Идиома Foreachasync >>>>> Download Now
ОтветитьУдалить>>>>> Download Full
Programming Stuff: Идиома Foreachasync >>>>> Download LINK
>>>>> Download Now
Programming Stuff: Идиома Foreachasync >>>>> Download Full
>>>>> Download LINK ht
Насколько это теперь актуально в свете появления в .Net Core Parallel.ForEachAsync?
ОтветитьУдалить