Показаны сообщения с ярлыком Concurrency. Показать все сообщения
Показаны сообщения с ярлыком Concurrency. Показать все сообщения

вторник, 7 июля 2015 г.

Идиома ForEachAsync

Давайте продолжим рассматривать приемы, которые будут полезными при работе с TPL.

В прошлый раз мы рассмотрели идиому, которая позволяет обрабатывать результаты в порядке окончания работы задач, а не в порядке их запуска. Но там был пропущен один интересный момент. Вот, например, у нас есть все тот же сервис погоды и желание получить результаты по всем городам как можно быстрее. Означает ли это, что можно взять все города мира и послать одновременно тысячи запросов? Сервис погоды может посчитать, что клиент сошел с ума и попробует затроттлить (throttle) запросы, превышающие определенный лимит (кстати, этот самый троттлинг – это один большой pain in the ass для всех облачных сервисов, причем как для авторов сервисов, так и для клиентов этих самых сервисов).

Так вот, нам нужно как-то ограничить число таких запросов или каких-то других асинхронных операций.

Вообще, с ограничением числа задач есть одна небольшая беда. В случае CPU Intensive операций (числодробилки, операции, которые нагружают CPU/GPU) есть простая эвристика – число работающих задач должно быть ограничено числом вычислительных устройств. Но, в случае с IO Intensive операциями таких ограничений нет. Более того, нет и встроенных инструментов для контроля за числом таких операций.

воскресенье, 28 июня 2015 г.

Идиома Process Tasks By Completion

При работе с тасками часто возникает такая задача: у нас есть набор входных данных и для обработки каждого элемента используется длительная операция.

Можно подойти к этой задаче в лоб. Крутим цикл, запускаем таски, обрабатываем результаты по одному:

private Task<Weather> GetWeatherForAsync(string city)
{
   
Console.WriteLine("[{1}]: Getting the weather for '{0}'"
, city,
       
DateTime.Now.
ToLongTimeString());
   
return WeatherService.
GetWeatherAsync(city);
}


[
Test]
public async Task
ProcessOneByOneNaive()
{
   
var cities = new List<string> { "Moscow", "Seattle", "New York"
};

   
var tasks =
        from city in
cities
       
select new { City = city, WeatherTask =
GetWeatherForAsync(city) };

   
foreach (var entry in
tasks)
    {
       
var wheather = await entry.
WeatherTask;

        ProcessWeather(entry
.
City, wheather);
    }
}

private void ProcessWeather(string city, Weather
weather)
{
   
Console.WriteLine("[{2}]: Processing weather for '{0}': '{1}'"
, city, weather,
       
DateTime.Now.ToLongTimeString());
}

Здесь мы обращаемся к некоторому сервису погоды, для получения температуры в каждом городе, затем обрабатываем полученные результаты путем вывода города и температуры на экран.

четверг, 18 июня 2015 г.

Небольшой трюк при работе с ConcurrentDictionary

У использования ConcurentDictionary есть одна особенность: в некоторых случаях он может вести себя не совсем так, как вы того ожидаете. Вот небольшой пример. Допустим, нам нужно запилить небольшой кэш, чтобы результаты дорогостоящей операции брались из кэша, если они там есть, ну и добавлялись в кэш прозрачным образом, если Акела промахнулся.

Простая реализация некоторого провайдера с cache aside паттерном будет выглядеть примерно так:

public class CustomProvider
{
   
private readonly ConcurrentDictionary<string, OperationResult> _cache =
        new ConcurrentDictionary<string, OperationResult>
();

   
public OperationResult RunOperationOrGetFromCache(string
operationId)
    {
       
return _cache.GetOrAdd(operationId, id =>
RunLongRunningOperation(id));
    }

   
private OperationResult RunLongRunningOperation(string
operationId)
    {
       
// Running real long-running operation
        // ...
        Thread.Sleep(10
);
       
Console.WriteLine("Running long-running operation"
);
       
return OperationResult.Create(operationId);
    }
}