понедельник, 27 сентября 2010 г.

[Перевод] Джозеф Албахари. Часть 5.2. Параллельное программирование

Это перевод второй (и заключительной) части статьи Джозефа Албахари (Joseph Albahari) о работе с потоками в C# - “Part 5: Parallel Programming”.

Распараллеливание агрегатных функций

PLINQ эффективно распараллеливает такие операторы как Sum, Average, Min и Max без дополнительного вмешательства. Оператор Aggregate предоставляет особые дополнительные возможности в PLINQ.

Если вы не знакомы с этим оператором, вы можете рассматривать его как обобщенную версию операторов Sum, Average, Min и Max, другими словами, это оператор, который позволяет добавить вам специализированный алгоритм аккумулирования (accumulation) значений для реализации нетиповых агрегаций.

int[] numbers = { 1, 2, 3 };
int sum = numbers.Aggregate (0, (total, n) => total + n);   // 6

Первый аргумент метода Aggregate – это начальное значение (seed), с которого начинается аккумулирование. Второй аргумент – это выражение, которое обновляет накопленное значение и возвращающее новый элемент. Вы можете передать третий необязательный параметр, для получения проекции результата из накопленного значения.

ПРИМЕЧАНИЕ
Большинство задач, для которых предназначен метод Aggregate, могут быть легко решены с помощью более знакомого синтаксиса – цикла foreach. Преимущество использования Aggregate заключается в том, что большая и сложная агрегация может быть распараллелена декларативным образом.

Агрегации без начального значения

Вы можете опустить начальное значение (seed value) при вызове метода Aggregate, в таком случае первый элемент входной последовательности неявно будет являться начальным значением, и агрегация будет продолжена со второго элемента. Вот предыдущий пример, без начального значения:

int[] numbers = { 1, 2, 3 };
int sum = numbers.Aggregate ((total, n) => total + n);   // 6

Это дает аналогичный результат, но на самом деле, вычисления выполняются другие. В первом случае мы вычисляли 0 + 1 + 2 + 3; сейчас мы вычисляем: 1 + 2 + 3. Эти различия более показательны при использовании умножения вместо сложения:

int[] numbers = { 1, 2, 3 };
int x = numbers.Aggregate (0, (prod, n) => prod * n);   // 0*1*2*3 = 0
int y = numbers.Aggregate (   (prod, n) => prod * n);   //   1*2*3 = 6

Как мы вскоре увидим, агрегации без начального значения могут быть распараллелены без использования дополнительных перегруженных версий этого метода. Однако здесь существует одна ловушка: агрегирующие методы без начального значения должны использоваться в методах, которые являются коммутативными и ассоциативными. В противном случае, результат будет либо интуитивно не понятным (с простыми запросами), либо неопределенным (при распараллеливании запросов с помощью PLINQ). Например, рассмотрим следующую функцию:

(total, n) => total + n * n 

Этот делегат не является ни коммутативным, ни ассоциативным. (Например, 1 + 2*2 != 2 + 1*1). Давайте посмотрим на результат его использования для получения суммы квадратов чисел 2, 3, 4:

int[] numbers = { 2, 3, 4 };
int sum = numbers.Aggregate ((total, n) => total + n * n);    // 27

Вместо вычисления:

2*2 + 3*3 + 4*4    // 29 

Он вычисляет

2 + 3*3 + 4*4      // 27 

Мы можем исправить это несколькими способами. Во-первых, мы можем добавить 0, в качестве первого элемента

int[] numbers = { 0, 2, 3, 4 }; 

Но это не только неэлегантно, но это также приведет к некорректному результату при параллельном выполнении, поскольку PLINQ предполагает наличие ассоциативности и выбирает несколько элементов в качестве начального значения. Для демонстрации этого, давайте определим нашу агрегирующую функцию следующим образом:

f(total, n) => total + n * n

LINQ to Objects вычисляет это таким образом:

f(f(f(0, 2),3),4) 

а PLINQ может вычислять так:

f(f(0,2),f(3,4)) 

со следующими результатами:

Первая партиция (partition):   a = 0 + 2*2  (= 4)
Вторая партиция:   b = 3 + 4*4  (= 19)
Конечный результат:           a + b*b  (= 365)
ИЛИ ДАЖЕ:                 b + a*a  (= 3

Существует два хороших решения. Первой решение заключается в применении агрегации с начальным значением, передав в качестве начального значения 0. Единственная сложность с PLINQ заключается в том, что если мы хотим, чтобы запрос не выполнялся последовательно, нужно использовать специальную перегруженную версию (как мы вскоре увидим это).

Второе решение заключается в преобразовании запроса таким образом, чтобы агрегирующая функция стала коммутативной и ассоциативной:

int sum = numbers.Select (n => n * n).Aggregate ((total, n) => total + n); 

ПРИМЕЧАНИЕ
Конечно, в таком простом сценарии вы можете (и должны) использовать оператор Sum, вместо Aggregate:

int sum = numbers.Sum (n => n * n);

На самом деле, вы можете много чего сделать с помощью операторов Sum и Aggregate. Например, вы можете использовать Average для вычисления среднеквадратического отклонения.

Math.Sqrt(numbers.Average (n => n * n)) 

Или даже для вычисления обычного отклонения:

double mean = numbers.Average();
double sdev = Math.Sqrt (numbers.Average (n =>
              {
                double dif = n - mean;
                return dif * dif;
              }));

Оба вычисления безопасны, эффективны и полностью распараллеливаемы.

Распараллеливание агрегаций

Мы только что видели, что для агрегаций без начального значения, передаваемый делегат должен быть ассоциативным и коммуникативным. При нарушении этого правила PLINQ будет выдавать неправильные результаты, поскольку он берет несколько начальных значений из входной последовательности для одновременной агрегации различных партиций последовательности.

Применение агрегаций, с явно указанными начальными значениями, может показаться безопасным, но, к сожалению, они обычно вычисляются последовательно из-за зависимости с единственным начальным значением. Для решения этой проблемы в PLINQ есть дополнительная перегруженная версия оператора Aggregate, которая позволяет указать вам несколько начальных значений в виде фабричного метода начальных значений (seed factory function). Каждый поток вызывает эту функцию для получения независимого начального значения, которое становится аккумулятором, локальным для потока, в котором накапливаются локальные элементы.

Вам также нужно задать функцию, которая покажет, каким именно образом будут объединяться локальный и главный аккумуляторы. И последнее, эта перегрузка оператора Aggregate ожидает делегат (зачастую ненужный), который выполняет окончательное преобразования результата (вы можете добиться того же самого путем вызова некоторой функции на результате, полученном после выполнения этого оператора). Итак, вот четыре делегата, в том порядке, в котором вы должны их передать:

  • seedFactory: возвращает новый локальный аккумулятор
  • updateAccumulatorFunc: агрегирует элемент в локальный аккумулятор
  • combineAccumulatorFunc: объединяет локальный и главный аккумуляторы
  • resultSelector: применяет любые преобразования на конечном результате

ПРИМЕЧАНИЕ
В простых случаях вы можете указать начальное значение вместо фабричного метода. Но этот способ не работает, если начальное значение относится к изменяемому (mutating) ссылочному типу (reference type), поскольку в этом случае, один и тот же экземпляр будет совместно использоваться всеми потоками.

В качестве очень простого примера давайте рассмотрим сумму значений массива numbers:

numbers.AsParallel().Aggregate (
  () => 0,                                     // seedFactory
  (localTotal, n) => localTotal + n,           // updateAccumulatorFunc
  (mainTot, localTot) => mainTot + localTot,   // combineAccumulatorFunc
  finalResult => finalResult)                  // resultSelector

Этот пример показывает, что мы так же эффективно можем получить тот же самый ответ с помощью более простых подходов (как, например, с помощью агрегата без указания начального значения или, еще лучше, с помощью оператор Sum). Для демонстрации более реального примера, давайте предположим, что мы хотим вычислить частоту всех букв английского алфавита в определенной строке. Простое решение, которое выполняется последовательно, может быть таким:

string text = "Let’s suppose this is a really long string";
var letterFrequencies = new int[26];
foreach (char c in text)
{
  int index = char.ToUpper (c) - 'A';
  if (index >= 0 && index <= 26) letterFrequencies [index]++;
};

ПРИМЕЧАНИЕ
Примером текста очень большого размера может быть последовательность генов. В таком случае «алфавит» будет содержать буквы a, c, g и t.

Для распараллеливания мы можем заменить оператор foreach вызовом метода Parallel.ForEach (как увидим в следующем разделе), но в этом случае нам придется решить проблему параллельного доступа к общему массиву. А блокировка доступа к этому массиву просто убьет весь потенциал для распараллеливания.

Оператор Aggregate предоставляет лучшее решение. Аккумулятором в данном случае будет массив, аналогичный массиву letterFrequencies, из нашего предыдущего примера. Вот последовательная версия с использованием Aggregate:

int[] result =
  text.Aggregate (
    new int[26],                // Создаем «аккумулятор»
    (letterFrequencies, c) =>   // Агрегируем буквы в аккумуляторе
    {
      int index = char.ToUpper (c) - 'A';
      if (index >= 0 && index <= 26) letterFrequencies [index]++;
      return letterFrequencies;
    });

И вот теперь параллельная версия, которая использует специальную перегрузку метода Aggregate:

int[] result =
  text.AsParallel().Aggregate (
    () => new int[26],             // Создаем новый локальный аккумулятор
    (localFrequencies, c) =>       // Добавляем элементы в локальный аккумулятор
    {
      int index = char.ToUpper (c) - 'A';
      if (index >= 0 && index <= 26) localFrequencies [index]++;
      return localFrequencies;
    },
                                   // Объединяем локальный и главный аккумуляторы
    (mainFreq, localFreq) =>
      mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(),
    finalResult => finalResult     // Выполняем окончательное преобразование
  );                               // на конечном результате.
Обратите внимание, что локальные аккумулирующие функции изменяют (mutate) массив localFrequencies. Это является важной оптимизацией, работающей совершенно корректно, поскольку переменная localFrequencies является локальной для каждого потока.

Класс Parallel

PFX представляет базовую форму структурного параллелизма с помощью трех методов класса Parallel:

  • Parallel.Invoke: выполняет параллельно массив делегатов
  • Parallel.For: параллельный эквивалент цикла for
  • Parallel.ForEach: параллельный эквивалент цикла foreach

Все три метода блокируют управление до окончания выполнения всех действий. Как и при использовании PLINQ при возникновении необработанного исключения, оставшиеся рабочие потоки прекращают выполнение после завершения обработки текущего элемента и исключение (или исключения), завернутые в AggregationException, пробрасываются вызывающему коду.

Parallel.Invoke

Метод Parallel.Invoke выполняет массив делегатов типа Action параллельно, и затем ожидает из завершения. Самая простая версия этого метода определена следующим образом:

public static void Invoke (params Action[] actions); 

Вот как мы можем воспользоваться методом Parallel.Invoke для одновременной загрузки двух веб-страниц:

Parallel.Invoke (
() => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"),
() => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html"));

С первого взгляда может показаться, что это просто сокращенная форма создания двух объектов класса Task (или асинхронных делегатов) с последующим ожиданием их завершения. Но есть одно существенное различие: метод Parallel.Invoke будет выполняться эффективно даже если вы передадите ему миллион делегатов. Это связано с тем, что он разбивает большое количество элементов на пачки, которые назначаются набору объектов Task, а не создает по отдельному объекту Task для каждого делегата.

При использовании всех методов класса Parallel ответственность за объединение результатов лежит на вас. Это значит, что вы сами должны думать о безопасности потоков. Так, например, следующий код не является потокобезопасным:

var data = new List<string>();
Parallel.Invoke (
() => data.Add (new WebClient().DownloadString ("http://www.foo.com")),
() => data.Add (new WebClient().DownloadString ("http://www.far.com")));

Использование блокировок вокруг добавления в список решит проблему, хотя и создаст узкое место, при большом количестве быстроисполнимых делегатов. В данном случае идеальным решением будет использование потокобезопасных коллекций, таких как ConcurrentBag.

Существует перегруженная версия метода Parallel.Invoke, которая принимает объект класса ParallelOptions.

public static void Invoke (ParallelOptions options,
                           params Action[] actions);

С помощью ParallelOptions вы можете добавить маркер отмены, ограничить максимальное количество рабочих потоков или указать свой планировщик задач (custom task scheduler). Использование маркеров отмены полезно, когда число одновременно выполняемых задач превосходит (примерное) количество ядер процессора: при отмене, все делегаты, выполнение которых еще не было начато, будут отменены. Однако все делегаты, выполнение которых уже начато, продолжат выполнение до завершения. См. раздел Отмена для примеров использования маркеров отмены.

Parallel.For и Parallel.ForEach

Методы Parallel.For и Parallel.ForEach аналогичны C# операторам цикла for и foreach, за исключением того, что итерирование элементов последовательности происходит параллельно, а не последовательно. Вот их (упрощенные) сигнатуры:

public static ParallelLoopResult For (
  int fromInclusive, int toExclusive, Action<int> body)
public static ParallelLoopResult ForEach<TSource> (
  IEnumerable<TSource> source, Action<TSource> body)

Следующий последовательный оператор for:

for (int i = 0; i < 100; i++)
  Foo (i);

Может быть распараллелен так:

Parallel.For (0, 100, i => Foo (i)); 

или еще проще:

Parallel.For (0, 100, Foo); 

А следующий последовательный оператор foreach:

foreach (char c in "Hello, world")
  Foo (c);

может быть распараллелен так:

Parallel.ForEach ("Hello, world", Foo); 

В качестве реального практического примера, если мы воспользуемся пространством имен System.Security.Cryptography, мы сможет сгенерировать шесть пар открытый/закрытый ключ параллельно так:

var keyPairs = new string[6];
Parallel.For (0, keyPairs.Length,
              i => keyPairs[i] = RSA.Create().ToXmlString (true));

С помощью Parallel.Invoke мы может передать в методы Parallel.For и Parallel.ForEach большое количество рабочих элементов, которые будут эффективно разбиты на секции для обработки несколькими задачами.

ПРИМЕЧАНИЕ
Последний запрос может быть также реализован с помощью
PLINQ:

string[] keyPairs =
  ParallelEnumerable.Range (0, 6)
  .Select (i => RSA.Create().ToXmlString (true))
  .ToArray();
Внешние циклы vs внутренние циклы

Методы Parallel.For и Parallel.ForEach обычно лучше работают на внешних, а не внутренних циклах. Это связано с тем, что в первом случае для распараллеливания вы предоставляете блоки (chunks) большего размера, снижая влияние накладных расходов. Распараллеливать одновременно внешние и внутренние циклы обычно не нужно. В следующем примере, нам понадобится более 100 ядер процессора для получения преимуществ от распараллеливания внутреннего цикла:

Parallel.For (0, 100, i =>
{
  Parallel.For (0, 50, j => Foo (i, j));   // Для внутреннего цикла лучше использовать
});                                        // последовательное выполнение
Индексированная версия Parallel.ForEach

Иногда полезно знать индекс текущего элемента внутри цикла. Используя последовательную версию foreach, это сделать просто:

int i = 0;
foreach (char c in "Hello, world")
  Console.WriteLine (c.ToString() + i++);

Однако изменение разделяемой переменной не является потокобезопасной операцией в параллельном контексте. Вместо этого вы должны использовать следующую версию метода ForEach:

public static ParallelLoopResult ForEach<TSource> (
  IEnumerable<TSource> source, Action<TSource,ParallelLoopState,long> body)

Пока не обращайте внимание на ParallelLoopState (который будет рассмотрен в следующем разделе). На данный момент, нас интересует только третий параметр делегата Action с типом long, который как раз и содержит индекс цикла:

Parallel.ForEach ("Hello, world", (c, state, i) =>
{
   Console.WriteLine (c.ToString() + i);
});

Для того чтобы рассмотреть этот метод в реальном контексте давайте изменим наш класс проверки правописания, реализованный с помощью PLINQ. Следующий код загружает словарь вместе с массивом из миллиона тестовых слов:

if (!File.Exists ("WordLookup.txt"))    // Содержит порядка 150,000 слов
  new WebClient().DownloadFile (
    "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");
var wordLookup = new HashSet<string> (
  File.ReadAllLines ("WordLookup.txt"),
  StringComparer.InvariantCultureIgnoreCase);
var random = new Random();
string[] wordList = wordLookup.ToArray();
string[] wordsToTest = Enumerable.Range (0, 1000000)
  .Select (i => wordList [random.Next (0, wordList.Length)])
  .ToArray();
wordsToTest [12345] = "woozsh";     // Добавляем несколько
wordsToTest [23456] = "wubsie";     // ошибок правописания

Мы можем выполнить проверку правописания в нашем массиве wordsToTest с помощью индексированной версии метода Parallel.ForEach следующим образом:

var misspellings = new ConcurrentBag<Tuple<int,string>>();
Parallel.ForEach (wordsToTest, (word, state, i) =>
{
  if (!wordLookup.Contains (word))
    misspellings.Add (Tuple.Create ((int) i, word));
});

Обратите внимание, что нам нужно самостоятельно объединить результаты в потокобезопасной коллекции: такая необходимость является недостатком по сравнению с использованием PLINQ. Преимуществом использования класса Parallel по сравнению с использованием PLINQ является то, что нам не нужно использовать дорогостоящую индексированную версию оператора запроса Select, которая является менее эффективной, по сравнению с индексированной версией метода ForEach.

ParallelLoopState: ранний выход из циклов

Поскольку телом циклов параллельной версии For или ForEach является делегат, вы не можете закончить выполнение цикла заранее с помощью оператора break. Вместо этого, вы должны вызвать метод Break или Stop объекта ParallelLoopState.

public class ParallelLoopState
{
  public void Break();
  public void Stop();
  public bool IsExceptional { get; }
  public bool IsStopped { get; }
  public long? LowestBreakIteration { get; }
  public bool ShouldExitCurrentIteration { get; }
}

Получить объект ParallelLoopState очень просто: все версии методов For и ForEach содержат перегруженную версию, которая принимает делегат вида Action<TSource, ParallelLoopState> в качестве тела цикла. Таким образом, распараллелить этот код:

foreach (char c in "Hello, world")
  if (c == ',')
    break;
  else
    Console.Write (c);

Можно так:

Parallel.ForEach ("Hello, world", (c, loopState) =>
{
  if (c == ',')
    loopState.Break();
  else
    Console.Write (c);
});
Hlloe 

По результатам выполнения видно, что тело цикла может завершиться в случайном порядке. Но кроме этого различия, вызов метода Break приводит к выполнению по крайней мере того же числа элементов, что и при выполнении цикла последовательно: в этом примере будут выведены по крайней мере буквы H, e, l, l, и o в произвольном порядке. Однако вызов метода Stop, в отличие от метода Break, приведет к завершению всех потоков сразу же после выполнения текущей итерации. В нашем примере, вызов метода Stop приведет к выводу подмножества букв H, e, l, l, и o, если какой-то из потоков будет запаздывать. Вызов метода Stop полезен в тех случаях, когда вы либо нашли то, что искали, либо когда что-то пошло не так и вы не хотите обрабатывать результаты.

ПРИМЕЧАНИЕ
Методы Parallel.For и Parallel.ForEach возвращают объект ParallelLoopResult, который содержит свойства IsCompleted и LowestBreakIteration. Эти свойства могут дать вам понять, закончилось ли выполнение цикла полностью, и если нет, то на какой итерации цикла выполнение было прервано.

Если LowersBreakIteration возвращает null, это означает, что был вызван метод Stop (а не Break).

Если тело вашего цикла выполняется очень долго, вы можете захотеть прервать его выполнение из другого потока в середине выполнения путем вызова методов Break или Stop. Вы можете добиться этого путем опроса свойства ShouldExitCurrentIteration в различных участках кода; это свойство начинает возвращать true сразу после вызова метода Stop и вскоре после вызова метода Break.

ПРИМЕЧАНИЕ
ShouldExitCurrentIteration также начинает возвращать true после отмены выполнения (cancelation request) или после генерации исключения в теле цикла.

Свойство IsExceptional позволяет узнать, произошло ли исключение в другом потоке. Любые необработанные исключения приведут к остановке выполнения цикла, после окончания выполнения текущей итерации другими потоками: чтобы избежать этого вам следует явно обрабатывать исключения в вашем коде.

Оптимизация с помощью локальных значений

Методы Parallel.For и Parallel.ForEach содержат перегруженную версию, которая принимает обобщенный аргумент TLocal. Эти версии предназначены для упрощения объединения данных при использовании циклов с большим количеством итераций. Вот простой пример:

public static ParallelLoopResult For <TLocal> (
  int fromInclusive,
  int toExclusive,
  Func <TLocal> localInitFunc <int, ParallelLoopState, TLocal, TLocal> body,
  Action <TLocal> localFinally);

Необходимость в использовании этих методов на практике возникает редко, поскольку PLINQ покрывает их функциональность (и это хорошо, поскольку эти перегрузки выглядят пугающе!).

По сути, проблема заключается в следующем: предположим, мы хотим найти сумму квадратных корней чисел в диапазоне от 1 до 10,000,000. Задача вычисления 10 миллионов квадратных корней очень легко распараллеливается, но их суммирование является проблемой, поскольку придется использовать блокировку при обновлении суммы:

object locker = new object();
double total = 0;
Parallel.For (1, 10000000,
              i => { lock (locker) total += Math.Sqrt (i); });

Однако выгода от параллелизма превосходит накладные расходы на получение 10 миллионов блокировок плюс длительность каждой блокировки.

Однако на самом деле, нам не нужно получать эти 10 миллионов блокировок. Представьте себе команду добровольцев, которая занимается уборкой огромного количества мусора. Если каждый рабочий будет использовать одну единственную общую мусорную урну, перенос мусора в эту урну и конкуренция за нее сделают эту работу чрезвычайно неэффективной. Очевидным решением в этом случае является обеспечение каждого сотрудника своим собственным или «локальным» мусорным баком, который время от времени опустошается в большой общий бак.

Методы For и ForEach, которые принимают TLocal, работают именно так. Добровольцами в этом случае являются рабочие потоки, а локальные значения представляют собой «локальные» мусорные баки. Для того, чтобы класс Parallel сделал это, вы должны передать ему два дополнительных делегата, которые указывают:

  1. Как инициализировать новое локальное значение
  2. Как объединить локальные значение с главным значением

Вдобавок к этому,  делегат тела цикла вместо возврата void, должен возвращать новый аккумулятор локальных значений. Вот наш переработанный пример:

object locker = new object();
double grandTotal = 0;
Parallel.For (1, 10000000,
  () => 0.0,                        // Инициализация локального значения.
  (i, state, localTotal) =>         // Делегат тела цикла. Обратите внимание, что он
     localTotal + Math.Sqrt (i),    // возвращает новую локальную сумму.
  localTotal =>                                    // Добавление локального значение
    { lock (locker) grandTotal += localTotal; }    // к основному значению.
);

Мы все еще используем блокировку, но на этот раз только вокруг добавления локального значения к основной сумме. Это делает процесс вычисления значительно более эффективным.

ПРИМЕЧАНИЕ
Как уже говорилось ранее, в подобных случаях отлично подходит PLINQ. Наш пример может быть распараллелен очень просто с помощью PLINQ следующим образом:

ParallelEnumerable.Range(1, 10000000)
                  .Sum (i => Math.Sqrt (i))

(Обратите внимание, что мы используем ParallelEnumerable для принудительного использования секционирования по диапазону (range partitioning): это улучшает производительность, поскольку вычисление всех значений требует примерно одинакового количества времени.)

В более сложных случаях, вы можете использовать LINQ оператор Aggregate вместо Sum. Если вы укажете фабричный метод инициализации начального значения, ситуация будет более или менее похожа на использование функции Parallel.For с использованием локальных значений.

Параллелизм задач

Параллелизм задач (task parallelism) – это подход наиболее низкого уровня для распараллеливания задач с помощью PFX. Классы этого уровня определены в пространстве имен System.Threading.Tasks:

Class

Purpose

Task Для управления единицей работы
Task<TResult> Для управления единицей работы, которая возвращает значение
TaskFactory Для создания задач
TaskFactory<TResult> Для создания задач и продолжений с тем же типом возвращаемого значения
TaskScheduler Для управления планировщиком задач
TaskCompletionSource Для ручного управления жизненным циклом задачи

По сути, задание представляет собой легковесный объект для управления распараллеливанием единицы работы (unit of work). Задание избегает накладных расходов по запуску выделенного потока путем использования пула потоков CLR: тот же самый пул потоков используется при вызове функции ThreadPool.QueueUserWorkItem, настроенный  в CLR 4.0специальным образом для более эффективной работы вместе с заданиями (и более эффективной работы в целом).

Задания могут применяться всегда, когда вам нужно выполнить что-либо  параллельно. Однако они оптимизированы для повышения эффективности многоядерных процессоров: фактически класс Parallel и PLINQ построены на основе конструкций параллелизма задач.

Задания делают значительно больше, нежели предоставляют эффективный способ использования пула потоков. Они также предоставляют возможности для управления единицами работы (units of work), включая следующие возможности:

Задачи также реализуют локальные рабочие очереди (local work queues), технику оптимизации, которая позволяет эффективно создавать множество коротких дочерних заданий не подвергаясь накладным расходам по конкуренции за общие ресурсы, которые бы возникали в случае использования одной рабочей очереди.

ВНИМАНИЕ
Библиотека параллелизма задач (Task Parallel Library) позволяет вам создавать сотни (даже тысячи) задач с минимальными накладными расходами. Но если вам нужно создавать миллионы задач, то для поддержания эффективности вам нужно разбить их на более крупные единицы работы. Класс Parallel и PLINQ делают это автоматически.

ПРИМЕЧАНИЕ
Visual Studio 2010 предоставляет новое окно для мониторинга задач (Debug | Window | Parallel Tasks). Это окно эквивалентно окну Threads, но предназначено для заданий. В окне Parallel Stacks также предусмотрен специальный режим для задач.

Создание и запуск задачи

Как мы обсудили в Части 1 при обсуждении пула потоков, вы можете создать и запустить Task вызвав метода Task.Factory.StartNew, передав в него делегат Action:

Task.Factory.StartNew (() => Console.WriteLine ("Hello from a task!")); 

Обобщенная версия, Task<TResult> (наследник класса Task), позволяет получить данные после завершения выполнения задачи:

Task<string> task = Task.Factory.StartNew<string> (() =>    // Начало задачи
{
  using (var wc = new System.Net.WebClient())
    return wc.DownloadString ("http://www.linqpad.net");
});
RunSomeOtherMethod();         // Мы можем выполнить другую работу параллельно...
string result = task.Result// Ожидаем завершения задачи для получения результатов.

Метод Task.Factory.StartNew создает и запускает задачу. Вы можете разъединить эти операции, вначале создав объект Task, а затем вызвав метод Start:

var task = new Task (() => Console.Write ("Hello"));
...
task.Start();

Задача, созданная таким образом, может быть выполнена синхронно (в том же потоке) путем вызова метода RunSynchronously вместо вызова метода Start.

ПРИМЕЧАНИЕ
Вы можете отслеживать статус выполнения задачи с помощью свойства Status.

Указание объекта состояния

При создании экземпляра задачи или вызова метода Task.Factory.StartNew, вы можете указать объект состояния, который будет передан целевому методу. Это может быть удобно, когда вы хотите вызвать метод напрямую, не используя лямбда-выражение:

static void Main()
{
  var task = Task.Factory.StartNew (Greet, "Hello");
  task.Wait();  // Ожидание завершения задачи.
}
static void Greet (object state) { Console.Write (state); }   // Hello

При наличии в C# лямбда-выражений, мы можем найти объекту состояния лучшее применение, так, например, мы можем задать разумное имя задачи. Затем мы можем использовать свойство AsyncState для получения этого имени:

static void Main()
{
  var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting");
  Console.WriteLine (task.AsyncState);   // Greeting
  task.Wait();
}
static void Greet (string message) { Console.Write (message); }

ПРИМЕЧАНИЕ
Visual Studio отображает свойство AsyncState каждой задачи в окне Parallel Tasks, так что наличие разумного имени может значительно облегчить процесс отладки.

TaskCreationOptions

Вы можете настроить процесс выполнения задачи путем использования перечисления TaskCreationOptions во время вызова метода StartNew (или создания экземпляра класса Task). TaskCreationoptions – это флаговое перечисление со следующими (объединяемыми) значениями:

  • LongRunning
  • PreferFairness
  • AttachedToParent

Значение LongRunning говорит планировщику выделить для задачи отдельный поток. Это целесообразно для задач с длительным временем выполнения (long-running tasks) поскольку в противном случае они могут «подвесить» очередь и заставить короткие задачи ожидать неразумное количество времени, прежде чем они смогут быть выполнены. Значение LongRunning также хорошо подходит для блокирующихся задач.

ПРИМЕЧАНИЕ
Проблема с очередью задач обычно возникает из-за того, что планировщик задач обычно старается поддерживать в определенный момент времени количество задач, необходимое для полной загрузки всех ядер процессора. Отсутствие перегрузки (oversubscribing) процессора слишком большим количеством активных потоков предотвращает падение производительности, которое происходит в следствие того, что операционная система вынуждена выполнять большое количество дорогих операций по квантованию времени (time slicing) и переключению контекста (context switching).

Значение PreferFairness говорит планировщику попытаться распределить задачи в том же порядке, в котором они были запущены. Обычно это не так, поскольку планировщик оптимизирует работу задач путем использования локальных очередей. Эта оптимизация приносит плоды на очень мелких задачах.

AttackedToParent предназначено для создания дочерних задач.

Дочерние задачи

Когда одна задача запускает другую, вы можете, если хотите, путем указания TaskCreationOptions.AttachedToparent вы можете установить отношения типа родительская задача / дочерняя задача:

Task parent = Task.Factory.StartNew (() =>
{
  Console.WriteLine ("I am a parent");
  Task.Factory.StartNew (() =>        // Detached task //независимая задача
  {
    Console.WriteLine ("I am detached");
  });
  Task.Factory.StartNew (() =>        // Child task //дочерняязадача
  {
    Console.WriteLine ("I am a child");
  }, TaskCreationOptions.AttachedToParent);
});

Дочерняя задача отличается тем, что при ожидании завершения родительской задачи, вы будете также ожидать завершения и всех дочерних задач. Как мы вскоре увидим, это может быть особенно полезно, когда дочерняя задача является продолжением.

Ожидание задач

Вы можете явно ожидать завершение задачи двумя способами:

  • Путем вызова метода Wait (с опциональным указанием тайм-аута).
  • Птем доступа к свойству Result (в случае использования класса Task<TResult>)

Вы также можете одновременно ожидать завершения нескольких задач с помощью статического метода Task.WaitAll (ожидать завершения всех указанных задач) и Task.WaitAny (ожидать завершения одной любой задачи).

Метод WaitAll аналогичен последовательному ожиданию всех задач, однако он более эффективен, поскольку он приводит (в большинстве случаев) всего лишь к одному переключению контекста. Также, если одна или более задача генерирует необработанное исключение, метод WaitAll продолжает ожидание всех остальных задач, и только потом выбрасывает одно исключение AggregateException, которое содержит все исключения неудачно завершившихся задач. Вызов этого метода эквивалентен следующему:

// Предполагаем, что t1, t2 и t3 – это задачи:
var exceptions = new List<Exception>();
try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
if (exceptions.Count > 0) throw new AggregateException (exceptions);

Вызов метода WaitAny эквивалентен ожиданию ManualResetEventSlim, который переводится в сигнальное состояние при завершении каждой задачи.

Помимо тайм-аута, в Wait-методы вы также можете передать маркер отмены: это позволит вам отменить ожидание, но не сами задачи.

Обработка ошибок в задачах

Когда вы ожидаете завершения задачи (либо путем вызова метода Wait, либо путем доступа к свойству Result), любое необработанное исключение будет проброшено вызывающему коду, обернутое в объект AggregationException. Обычно это устраняет необходимость в написании кода обработки непредвиденных исключений внутри задачи; вместо этого, вы можете сделать так:

int x = 0;
Task<int> calc = Task.Factory.StartNew (() => 7 / x);
try
{
  Console.WriteLine (calc.Result);
}
catch (AggregateException aex)
{
  Console.Write (aex.InnerException.Message);  // Пытались делить на 0
}

Вам все еще нужно обрабатывать исключения автономных задач (задач, которые не являются родительскими для других задач и завершения выполнения которых никто не ожидает) для того, чтобы предотвратить крах приложения из-за необработанных исключений, после того, как эта задача будет обработана сборщиком мусора (это тема следующего примечания). Это также касается задач, завершение выполнения которых ожидается по тайм-ауту, поскольку все исключения, которые будут сгенерированы после истечения тайм-аута станут «необработанными».

ПРИМЕЧАНИЕ
Статическое событие TaskScheduler.UnobservedTaskException предоставляет последнюю возможность сделать что-либо с необработанными исключениями задачи. Путем обработки этого события, вы можете перехватить исключения задачи и предоставить собственную логику их обработки и предотвратить завершение работы приложения.

Для родительских задач, ожидание ее завершения приводит к ожиданию всех дочерних задач и к обработке исключений, возникших в этих задачах.

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
var parent = Task.Factory.StartNew (() =>
{
  Task.Factory.StartNew (() =>   // Дочерняя задача
  {
    Task.Factory.StartNew (() => { throw null; }, atp);   // Дочерняя задача дочерней задачи
  }, atp);
});
// Следующий вызов приведет к генерации NullReferenceExcpetion
// которое будет содеражаться в AggregateException
parent.Wait(); 

ПРИМЕЧАНИЕ
Интересно, что проверка свойства задачи Exception, после возникновения исключения предотвратит последующую гибель вашего приложения. Причина заключается в том, что разработчики PFX не хотели, чтобы вы игнорировали исключения, но если вы каким-либо образом подтвердили его получение, этого достаточно, чтобы предотвратить гибель вашего приложения.

ВНИМАНИЕ
Необработанное исключение, возникшее в задаче не приводит к немедленной гибели вашего приложения: это произойдет после того, как сборщик мусора не доберется до объекта задачи и не вызовет его финализатор. Окончание работы приложения откладывается поскольку до тех пор, пока ваша задача не будет очищена сборщиком мусора невозможно узнать, собирались ли вы вызвать метод Wait или обращаться к свойствам Result или Exception. Эта задержка иногда может усложнять поиск исходной ошибки (хотя отладчик Visual Studio может помочь вам в этом путем остановки выполнения при возникновении исключения (first-change excpetion).

Как мы вскоре увидим, альтернативной стратегией обработки исключений является применение продолжений.

Отмена выполнения заданий

При запуске задачи вы можете опционально передать маркер отмены. Это позволит вам отменить выполнения задачи с помощью кооперативного паттерна отмены, рассмотренного ранее:

var cancelSource = new CancellationTokenSource();
CancellationToken token = cancelSource.Token;
Task task = Task.Factory.StartNew (() =>
{
  // Выполняем некоторые действия
  token.ThrowIfCancellationRequested();  // Проверяем наличие запроса отмены
  // Выполняем некоторые действий
}, token);
...
cancelSource.Cancel();

Для определения отмененной задачи, необходимо перехватить AggregateException и проверить вложенное исключение следующим образом:

try 
{
  task.Wait();
}
catch (AggregateException ex)
{
  if (ex.InnerException is OperationCanceledException)
    Console.Write ("Task canceled!");
}

ПРИМЕЧАНИЕ
Если вы хотите явно выбросить OperationCanceledException (вместо вызова token.ThrowIfCancellationRequested), вы должны передать маркер отмены в конструкторе OperationCanceledException. Если вы этого не сделаете, задача не завершится со статусом TaskStatus.Canceled и не сгенерирует событие OnlyOnCanceled для продолжений.

При отмене выполнения задачи, выполнение которой еще не началось, ее выполнение не будет запланировано (scheduled), вместо этого исключение OperationCanceledException будет сгенерировано немедленно.

Поскольку другие API понимают маркеры отмены, вы можете передать их другим конструкциям, в таком случае все операции будут отменены без вашего вмешательства.

var cancelSource = new CancellationTokenSource();
CancellationToken token = cancelSource.Token;
Task task = Task.Factory.StartNew (() =>
{
  // Передаем наш маркер отмены в PLINQ-запрос
  var query = someSequence.AsParallel().WithCancellation (token)...
  ... enumerate query ...
});

Вызов метода Cancel объекта cancelSource, в этом примпере, приведет к отмене PLINQ-запроса, что приведет к генерации исключения OperationCanceledException в теле задачи, что, в свою очередь, приведет к отмене выполнения этой задачи.

ПРИМЕЧАНИЕ
Маркеры отмены, который вы можете передать в такие методы как Wait и CancelAndWait позволяют вам отменять ожидание выполнения операции, а не саму задачу.

Продолжения

Иногда бывает полезным запустить на выполнение задачу сразу же после завершения выполнения другой задачи (или после завершения этой задачи неудачно). Метод ContinueWith класса Task предоставляет именно такую возможность.

Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant.."));
Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation"));

Как только задача task1 (родительская задача) завершится, упадет или будет отменена, задача task2 (продолжение) запустится автоматически. (Если задача task1 завершит свое выполнение до выполнения второй строки кода, задача task2 начнет свое выполнение сразу же). Аргумент ant, который передается в задачу-продолжение в лямбда-выражении является ссылкой на родительскую задачу.

В нашем примере показан самый простой тип продолжений, который функционально эквивалентен следующему:

Task task = Task.Factory.StartNew (() =>
{
  Console.Write ("antecedent..");
  Console.Write ("..continuation");
});

Однако подход на основе продолжений более гибок, поскольку вначале вы можете ожидать завершение задачи task1, а затем ожидать завершение задачи task2. Это может быть особенно важным, если задача task1 возвращает данные.

ПРИМЕЧАНИЕ
Еще одним (небольшим) отличием является то, что по умолчанию, родительская задача и ее продолжение может выполняться в разных потоках. Вы можете заставить принудительно выполняться их в одном потоке, указав TaskContinuationOptions.ExecuteSynchronously при вызове метода ContinueWith: это может улучшить производительность очень мелких продолжений путем уменьшения накладных расходов.

Продолжения и Task<TResult>

Как и обычные задачи, продолжения могут иметь тип Task<TResult> и возвращать некоторые данные. В следующем примере, мы вычисляем Math.Sqrt(8*2) путем выполнения нескольких связанных задач и только потом выводим результат вычисления:

Task.Factory.StartNew<int> (() => 8)
  .ContinueWith (ant => ant.Result * 2)
  .ContinueWith (ant => Math.Sqrt (ant.Result))
  .ContinueWith (ant => Console.WriteLine (ant.Result));   // 4

Наш пример неестественно прост; в реальных приложениях, лямбда-выражения будут вызывать функции с интенсивными вычислениями.

Продолжения и исключения

Продолжение может понять, было ли сгенерировано исключение родительской задачей с помощью свойства Exception родительской задачи. Следующий код выводит на экран подробную информацию исключения NullReferenceException на консоль:

Task task1 = Task.Factory.StartNew (() => { throw null; });
Task task2 = task1.ContinueWith (ant => Console.Write (ant.Exception));

ВНИМАНИЕ
Если родительская задача генерирует исключение, а продолжение не проверит свойство Exception родительской задачи (и никто не ожидает завершения родительской задачи), исключение считается необработанным и приложение умирает (если это исключение не будет обработано с помощью TaskScheduler.UnobservedTaskException).

Безопасным паттерном является пробрасывание родительского исключения. Если кто-либо ожидает завершения продолжения (путем вызова Wait), исключение будет проброшено коду, ожидающему завершения продолжения.

Task continuation = Task.Factory.StartNew     (()  => { throw null; })
                                .ContinueWith (ant =>
  {
    if (ant.Exception != null) throw ant.Exception;    // Продолжаем обработку...
  });
continuation.Wait();    // Теперь исключение будет проброшено вызывающему

Другим способом работы с исключениями является применение различных продолжений для нормального и ненормального завершений родительской задачи. Это делается с помощью TaskContinuationOptions:

Task task1 = Task.Factory.StartNew (() => { throw null; });
Task error = task1.ContinueWith (ant => Console.Write (ant.Exception),
                                 TaskContinuationOptions.OnlyOnFaulted);
Task ok = task1.ContinueWith (ant => Console.Write ("Success!"),
                              TaskContinuationOptions.NotOnFaulted);

Этот паттерн особенно полезен вместе с дочерними задачами, которые будут рассмотрены очень скоро.

Следующий метод расширения "проглатывает" необработанные исключения задачи:

public static void IgnoreExceptions (this Task task)
{
  task.ContinueWith (t => { var ignore = t.Exception; },
    TaskContinuationOptions.OnlyOnFaulted);
}

(Этот код можно улучшить путем добавления логирования исключений.) Вот как можно его использовать:

Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions(); 
Продолжения и дочерние задачи

Полезной возможностью продолжений является то, что они начинаются только после завершения дочерних задач. При этом любые исключения, генерируемые дочерними задачами пробрасываются в задачи-продолжения.

В следующем примере, мы запускаем три дочерние задачи, каждая из которых генерирует NullReferenceException. Мы затем обрабатываем их все в одном месте с помощью продолжения родительской задачи:

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew (() =>
{
  Task.Factory.StartNew (() => { throw null; }, atp);
  Task.Factory.StartNew (() => { throw null; }, atp);
  Task.Factory.StartNew (() => { throw null; }, atp);
})
.ContinueWith (p => Console.WriteLine (p.Exception),
                    TaskContinuationOptions.OnlyOnFaulted);

 image

Продолжение по условию

По умолчанию, запуск продолжений осуществляется безусловно (unconditionally): не важно, закончилась родительская задача успешно, произошло исключение или ее выполнение было отменено. Вы можете изменить это поведение путем установки одного (объединяемого) флага перечисления TaskContinuationOptions. Существует три ключевых флага, который контролируют условие запуска продолжения:

NotOnRanToCompletion = 0x10000,
NotOnFaulted = 0x20000,
NotOnCanceled = 0x40000,

Эти флаги обладают следующим свойством: чем больше флагов вы применяете, тем меньше вероятность того, что продолжение будет выполнено. Для удобства, существуют несколько объединенных значений:

OnlyOnRanToCompletion = NotOnFaulted | NotOnCanceled,
OnlyOnFaulted = NotOnRanToCompletion | NotOnCanceled,
OnlyOnCanceled = NotOnRanToCompletion | NotOnFaulted

(Объединение всех Not* флагов (NotOnRanToCompletion, NotOnFaulted, NotOnCanceled) не имеет смысла.)

RanToCompletion означает, что родительская задача завершилась успешно (она не была отменена или завершилась с необработанным исключением)

Faulted означает, что необработанное исключение было брошено родительской задачей

Canceled означает одно из двух:

  • Выполнение родительской задачи было отменено с помощью маркера отмены. Другими словами исключение OperationCanceledException было выброшено родительской задачей при этом свойство CancellationToken соответствует параметру, переданному родительской задаче при ее запуске.
  • Выполнение родительской задачи было отменено неявно, поскольку не было удовлетворено условие предиката для запуска продолжения

Очень важно понимать, что если продолжение не выполнено из-за этих флагов, то оно не забыто и не покинуто, его выполнение отменено. Это значит, что продолжение продолжения будет запущено, если они не были созданы с флагом NotOnCanceled. Например, давайте рассмотрим следующий код:

Task t1 = Task.Factory.StartNew (...);
Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),
                              TaskContinuationOptions.OnlyOnFaulted);
Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"));

После запуска, задача t3 обязательно будет запущена, даже если t1 не сгенерирует исключение. Это связано с тем, что если задача t1 завершится успешно, задача falt будет отменена, и поскольку для задачи t3 ограничения не заданы, она будет выполнена безусловно.

image

Если мы хотим, чтобы задача t3 запускалась только после выполнения задачи fault, мы должны написать так:

Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"),
               TaskContinuationOptions.NotOnCanceled); 

(С другой стороны, мы можем указать OnlyOnRanToCompletion; в этом случае t3 не будет выполнена, если по ходу выполнения fault произойдет исключение.)

Продолжения по нескольким предыдущим задачам

Другой полезной возможностью продолжений является то, что вы можете планировать их выполнение после выполнения нескольких предыдущих задач. Значение ContinueWhenAll приведет к запуску задачи только после завершения всех предыдущих задач. Оба эти методы определены в классе TaskFactory:

var task1 = Task.Factory.StartNew (() => Console.Write ("X"));
var task2 = Task.Factory.StartNew (() => Console.Write ("Y"));
var continuation = Task.Factory.ContinueWhenAll (
  new[] { task1, task2 }, tasks => Console.WriteLine ("Done"));

Этот код выведет “Done”  только после того, как выведет “XY” и “YX”. Аргумент tasks в лямбда-выражении дает вам доступ к массиву завершенных задач, что бывает полезным, когда предшественники возвращают какие-то данные. В следующем примере складываются два значения, которые возвращаются из двух предшествующих задач:

// task1 и task2 в реальных приложениях будут вызывать сложные функции:
Task<int> task1 = Task.Factory.StartNew (() => 123);
Task<int> task2 = Task.Factory.StartNew (() => 456);
Task<int> task3 = Task<int>.Factory.ContinueWhenAll (
  new[] { task1, task2 }, tasks => tasks.Sum (t => t.Result));
Console.WriteLine (task3.Result);           // 579

ПРИМЕЧАНИЕ
В этом примере мы включили аргумента типа <int> в наш вызов Task.Factory, чтобы пояснить, что мы получаем обобщенную фабрику задач. Аргумент типа является не обязательным, поскольку он может быть выведен компилятором.

Несколько продолжений одной задачи

Вызов метод ContinueWith более одного раза для одной и той же задачи создает несколько продолжений одной задачи. Когда предыдущая задача завершается, все продолжения запускаются одновременно (если вы не укажите TaskContinuationOptions.ExecuteSynchronously, в этом случае продолжения будут выполняться последовательно).

При выполнении следующего примера, происходит задержка в одну секунду, а затем на экран выводится “XY” или “YX”:

var t = Task.Factory.StartNew (() => Thread.Sleep (1000));
t.ContinueWith (ant => Console.Write ("X"));
t.ContinueWith (ant => Console.Write ("Y"));
Планировщики заданий и пользовательский интерфейс

Планировщик задач (task scheduler) назначает задания определенным потокам. Все задания связаны с определенным планировщиком, который представлен абстрактным классом TaskScheduler. .Net Framework предоставляет две конкретные реализации: планировщик по умолчанию (default scheduler), который работает совместно с пулом потоков CLR, и планировщик контекста синхронизации (synchronization context scheduler). Последний разработан (в основном) для упрощения работы с WPF и Windows Forms, которые требуют, чтобы обращение к элементам пользовательского интерфейса и элементам управления происходило только из потока, в котором они были созданы. Предположим, например, что мы в фоновом режиме хотим получить некоторые данные от веб-сервиса и затем, на основе полученных результатов, обновить метку (label) с именем lblResult. Мы можем разбить эту задачу на две подзадачи:

  1. Вызвать метод для получения данных от веб-сервиса (родительская задача).
  2. Обновить lblResult на основе полученных результатов (задача-продолжение).

Если для задачи-продолжения мы укажем планировщик контекста синхронизации, полученный при создании окна, тогда мы сможем спокойно обновить lblResult:

public partial class MyWindow : Window
{
  TaskScheduler _uiScheduler;   // Объявляем поле, чтобы мы могли использовать его
                                // повсюду в нашем классе.
  public MyWindow()
  {   
    InitializeComponent();
    // Получаем планировщик UI для потока, который создает эту форму:
    _uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
    Task.Factory.StartNew<string> (SomeComplexWebService)
      .ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler);
  }
  string SomeComplexWebService() { ... }
}

Также существует возможность написать свой собственный планировщик задач (путем создания наследника класса TaskScheduler), хотя это может быть полезным лишь в очень специфических сценариях. Для ручной настройки планирования заданий, обычно используется класс TaskCompletionSource, который мы вскоре рассмотрим.

TaskFactory

Когда вы обращаетесь к Task.Factory, вы обращаетесь к статическому свойству класса Task, которое возвращает объект класса TaskFactory по умолчанию. Фабрики задач (task factory) предназначены для создания задач, в частности, для создания трех типов задач:

  • «Обычные» задачи (с помощью StartNew)
  • Продолжения с несколькими родительскими задачами (с помощью ContinueWhenAll и ContinueWhenAny)
  • Задачи, которые являются оболочками методов, которые следуют модели асинхронного программирования (asynchronous programming model, APM) (с помощью FromAsync)

ПРИМЕЧАНИЕ
Интересно, что TaskFactory является единственным  способом для последних двух случаев. В первом случае (StartNew), TaskFactory является всего лишь удобным средством, и технически является избыточным, поскольку вы можете просто создать объект класса Task и вызвать затем метод Start.

Создание собственных фабрик задач

TaskFactory – это не абстрактная фабрика: вы можете создать его экземпляр; это может быть полезно, когда вы хотите создать несколько задач с одними и теми же (не стандартными) значениями TaskCreationOptions, TaskContinuationOptions или TaskScheduler. Например, если вы хотите создать несколько длительных родительских задач, мы можем создать фабрику следующим образом:

var factory = new TaskFactory (
  TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent,
  TaskContinuationOptions.None);

В таком случае для создания задач, остается всего лишь вызвать метод StartNew фабрики:

Task task1 = factory.StartNew (Method1);

Task task2 = factory.StartNew (Method2);
...

Настройки поведения продолжений вступят в силу при вызове методов ContinueWhenAll и ContinueWhenAny.

TaskCompletionSource

Класс Task предназначен для двух независимых целей:

  • он запускает на выполнение делегат в потоке пула потоков
  • он предоставляет богатый набор возможностей для управления рабочими элементами (продолжения, дочерние задачи, маршалинг исключений и т.д.).

Интересно, что по-сути, эти возможности не связаны: вы можете использовать возможности задач для управления рабочими элементами без использования пула потоков. Класс, который предоставляет такие возможности, называется TaskCompletionSource.

Для использования TaskCompletionSource вам просто нужно создать экземпляр этого класса. Он предоставляет свойство Task, которое возвращает задачу, завершение выполнения которой вы можете ожидать или присоединять к ней продолжения, аналогично тому, как это делается с любыми другими задачами. Однако задача полностью контролируется с помощью объекта TaskCompletionSource посредством следующих методов:

public class TaskCompletionSource<TResult>
{
  public void SetResult (TResult result);
  public void SetException (Exception exception);
  public void SetCanceled();
  public bool TrySetResult (TResult result);
  public bool TrySetException (Exception exception);
  public bool TrySetCanceled();
  ...
}

Если методы SetResult, SetException или SetCanceled вызвать более одного раза, то будет сгенерировано исключение; вызов же методов Try* вернет false.

ПРИМЕЧАНИЕ
TResult соответствует типу возвращаемого значения задачи, так что TaskCompletionSource<int> вернет Task<int>. Если вам нужна задача, которая ничего не возвращает, создайте TaskCompletionSource<object> и передайте null при вызове SetResult. Затем вы можете привести объект Task<object> к Task.

Следующий пример выводит 1 2 3 через пять секунд ожидания:

var source = new TaskCompletionSource<int>();
new Thread (() => { Thread.Sleep (5000); source.SetResult (123); })
      .Start();
Task<int> task = source.Task; // Наша «подчиненная» задача.
Console.WriteLine (task.Result); // 123

Далее мы увидим, как класс BlockingCollection может быть использован для написания очереди типа поставщик/потребитель. Затем мы покажем, как класс TaskCompletionSource может улучшить решение, предоставив возможность ожидать завершения и отменять выполнение элементов очереди.

Работа с AggregateException

Как мы уже видели ранее, PLINQ, класс Parallel и задачи автоматически пробрасывают исключения клиенту. Чтобы понять, почему это так важно, давайте рассмотрим следующий LINQ запрос, который генерирует исключение DivideByZeroException на первой итерации:

try
{
  var query = from i in Enumerable.Range (0, 1000000)
              select 100 / i;
  ...
}
catch (DivideByZeroException)
{
  ...
}

Если мы попросим PLINQ распараллелить этот запрос и если будет проигнорирована обработка исключений, то DivideByZeroException будет выброшено в другом потоке, в обход нашего блока catch, что приведет к гибели нашего приложения.

Вот почему исключения автоматически ловятся и пробрасываются вызывающему коду. Но, к сожалению, все обычно сложнее, чем простой перехват DivideByZeroException. Поскольку эти библиотеки упрощают работу с несколькими потоками, на самом деле возможна ситуация, когда два или более исключения возникнут одновременно. Для гарантии того, что информация ни о каких исключениях не будет потеряна, исключения заворачиваются в AggregateException, свойство InnerExceptions которого содержит все перехваченные исключения:

try
{
  var query = from i in ParallelEnumerable.Range (0, 1000000)
              select 100 / i;
  // Перебор результатов
  ...
}
catch (AggregateException aex)
{
  foreach (Exception ex in aex.InnerExceptions)
    Console.WriteLine (ex.Message);
}

ПРИМЕЧАНИЕ
PLINQ, как и класс Parallel завершает выполнение запроса или цикла при возникновении первого исключения и обработка последующих элементов и тел цикла не происходит. Однако могут возникнуть и другие исключения до завершения текущей итерации цикла. Первое возникшее исключение доступно через свойство InnerException класса AggregateException.

Flatten и Handle

Класс AggregateException предоставляет несколько методов, которые упрощают обработку исключений: методы Flatten и Handle.

Flatten

Исключения AggregateException довольно часто будут содержать другие исключения с типом AggregateException. Например, это может произойти, если дочерняя задача сгенерирует исключение. Вы можете устранить любое количество уровней вложенности и упростить обработку исключения путем вызова метода Flatten. Этот метод возвращает новый объект AggregateException, который содержит простой плоский список вложенных исключений:

catch (AggregateException aex)
{
  foreach (Exception ex in aex.Flatten().InnerExceptions)
    myLogWriter.LogException (ex);
}
Handle

Иногда бывает удобным перехватывать только определенные типы исключений, а другие типы исключений пробрасывать далее. Метод Handle класса AggregateException предоставляет такую возможность. Он принимает предикат, который выполняется для каждого вложенного исключения:

public void Handle (Func<Exception, bool> predicate)

Если предикат возвращается true, считается, что исключение «обработано». После того, как этот делегат выполнен для каждого исключения, происходит следующее:

  • Если все исключения «обработаны» (делегат вернул true), исключение далее не пробрасывается.
  • Если были исключения, для которых делегат вернул false («необработанные» исключения), создается новый объект AggregateException, содержащий эти исключения, и он пробрасывается далее.

Например, следующий код пробрасывает другой объект AggregateException, который содержит единственное исключение с типом NullReferenceException:

var parent = Task.Factory.StartNew (() => 
{
  // Сгенерирует три исключения одновременно с помощью 3-х дочерних задач:
 
  int[] numbers = { 0 };
 
  var childFactory = new TaskFactory
   (TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
 
  childFactory.StartNew (() => 5 / numbers[0]);   // Деление на нуль
  childFactory.StartNew (() => numbers [1]);      // Выход за границы диапазона
  childFactory.StartNew (() => { throw null; });  // Разыменовывание нулевого укзаателя
});
 
try { parent.Wait(); }
catch (AggregateException aex)
{
  aex.Flatten().Handle (ex =>   // Обратите внимание, все еще нужно вызывать метод Flatten
  {
    if (ex is DivideByZeroException)
    {
      Console.WriteLine ("Divide by zero");
      return true;                           // Это исключение «обработано»
    }
    if (ex is IndexOutOfRangeException)
    {
      Console.WriteLine ("Index out of range");
      return true;                           // Это исключение также «обработано»  
    }
    return false;    // Все остальные исключения будут проброшены далее
  });

Параллельные коллекции

Framework 4.0 предоставляет набор новых коллекций в пространстве имен System.Collections.Concurrent. Все они полностью потокобезопасны:

 

Параллельная коллекция

Непараллельный эквивалент
ConcurrentStack<T> Stack<T>
ConcurrentQueue<T> Queue<T>
ConcurrentBag<T> (none)
BlockingCollection<T> (none)

ConcurrentDictionary <TKey,TValue>

Dictionary <TKey,TValue>

Параллельные коллекции иногда бывают полезны при решении обычных задач многопоточности, когда вам нужна потокобезопасная коллекция. Однако есть несколько пояснений:

  • Параллельные коллекции оптимизированы для параллельного программирования. Стандартные коллекции их превосходят во всех случаях, кроме сценариев с высокой конкурентностью.
  • Потокбезопасные коллекции не гарантируют, что код, который их использует будет потокобезопасным.
  • Если в процессе перебора элементов параллельной коллекции другой поток ее модифицирует, исключение сгенерировано не будет. Вместо этого, вы получите смесь старого и нового содержимого.
  • Не существует параллельной версии List<T>.
  • Параллельные классы стека, очереди и набора (bag) внутри реализованы на основе связных списков. Это делает их менее эффективными в плане потребления памяти по сравнению с непараллельными версиями классов Stack и Queue, но более предпочтительными для параллельного доступа, поскольку связные списки являются отличными кандидатами для lock-free или low-lock реализаций. (Поскольку вставка узла в связный список требует модификации лишь пары ссылок, в то время как вставка элемента в структуру данных наподобие List<T> может потребовать перемещения тысяч существующих элементов.)

Другими словами, использование этих коллекций не эквивалентно использованию стандартных коллекций с операторами lock. Например, если мы выполним этот код в одном потоке:

var d = new ConcurrentDictionary<int,int>();
for (int i = 0; i < 1000000; i++) d[i] = 123;

он будет выполняться втрое медленнее, нежели этот код:

var d = new Dictionary<int,int>();
for (int i = 0; i < 1000000; i++) lock (d) d[i] = 123;

(Однако чтение ConcurrentDictionary выполняется быстрее, поскольку чтения являются lock-free.)

Параллельные коллекции еще отличаются от стандартных коллекций тем, что они содержат специальные методы для выполнения атомарных операций типа «проверить-и-выполнить» (test-and-act), такие как TryPop. Большинство этих методов унифицированы посредством интерфейса IProducerConsumerCollection<T>.

IProducerConsumerCollection<T>

Существует два основных сценария использования коллекций типа поставщик/потребитель (producer/consumer):

  • Добавление элементов («поставка»)
  • Получение элемента и его одновременное удаление («потребление»)

Классическим примером являются стеки и очереди. Коллекции типа поставщик/потребитель играют значительную роль в мире параллельного программирования, поскольку они прекрасно подходят для эффективных lock-free реализаций.

Интерфейс IProducerConsumerConnection<T> представляет собой потокобезопасные коллекции типа поставщик/потребитель. Следующие классы реализуют этот интерфейс:

Интерфейс IProducerConsumerCollection<T> расширяет интерфейс ICollection<T> путем добавления следующих методов:

void CopyTo (T[] array, int index);
T[] ToArray();
bool TryAdd (T item);
bool TryTake (out T item);

Методы TryAdd и TryTake проверяют, может ли быть выполнена операция добавления/удаления элемента, и если операция может быть выполнена, то она выполняется. Проверка и выполнение операции выполняются атомарно, устраняя необходимость в блокировке, которая понадобилась бы при использовании стандартной коллекции:

int result;
lock (myStack) if (myStack.Count > 0) result = myStack.Pop();

Метод TryTake возвращает false, если коллекция пуста. Метод TryAdd всегда завершается успешно и возвращает true во всех трех существующих реализациях. Если вы напишите свою собственную параллельную коллекцию, которая будет запрещать дубликаты, то она может возвращать false, если такой элемент уже существует в коллекции (например, если вы напишите параллельную версию класса Set).

Конкретный элемент, который удаляется при вызове метода TryTake, определяется конкретной реализацией:

  • В классе стека, метод TryTake удаляет последний добавленный элемент.
  • В классе очереди, метод TryTake удаляет самый первый добавленный элемент.
  • В классе Bag, метод TryTake удаляет любой элемент, который может быть удален максимально эффективно.

Эти три класса в основном реализуют методы TryTake и TryAdd явно (explicitly), предоставляя ту же самую функциональность с помощью других открытых методов с более точными названиями, такими как TryDequeue и TryPop.

ConcurrentBag<T>

Класс ConcurrentBag<T> хранит несортированную коллекцию объектов (с возможными дубликатами). ConcurrentBag<T> подходит в тех случаях, когда вам не важно какой элемент вы получите при вызове методов Take или TryTake.

Преимущество использования ConcurrentBag<T> по сравнению с ConcurrentQueue<T> и ConcurrentStack<T> в том, что при вызове метода Add из нескольких потоков одновременно не происходит практически никакой конкуренции. В противоположность этому, одновременный вызов метода Add для стека или очереди приводит к некоторой конкуренции (хотя и значительно меньшей, нежели блокировка неконкурентной коллекции). Вызов метода Take этого класса также очень эффективен, по крайней мере до тех пор, пока каждый поток получает (take) меньше элементов, чем добавляет.

Внутри ConcurrentBag<T> содержит закрытый связный список для каждого потока. Элементы добавляются в закрытый список, который относится к тому потоку, который вызвал метод Add, тем самым устраняя конкуренцию. Когда вы перебираете значения коллекции, енумератор проходит по закрытому списку каждого потока, возвращая каждый элемент этого списка.

Когда вы вызываете метод Take, вначале просматривается закрытый список текущего потока. Если там есть хотя бы один элемент, задача может быть завершена очень быстро и (в большинстве случаев) безо всякой конкуренции. Но если этот список пуст, метод должен «украсть» элемент из закрытого списка другого потока, что может повлечь возможный конкурентный доступ.

Короче говоря, метод Take вернет последний элемент, добавленный этим потоком; если такого элемента нет, то он вернет последний элемент, добавленный другим, случайно выбранным потоком.

Класс ConcurrentBag<T> идеально подходит, когда основными параллельными операциями являются добавление элементов, или когда добавление и получение элементов сбалансировано для каждого потока. Первый вариант мы видели, когда мы использовали Parallel.ForEach для реализации параллельной проверки правописания:

var misspellings = new ConcurrentBag<Tuple<int,string>>();
 
Parallel.ForEach (wordsToTest, (word, state, i) =>
{
  if (!wordLookup.Contains (word))
    misspellings.Add (Tuple.Create ((int) i, word));
});

ConcurrentBag будет плохим выбором для очереди типа “поставщик/потребитель», поскольку в этом случае элементы добавляются и удаляются разными потоками.

BlockingCollection<T>

При вызове метода TryTake на любой из коллекций типа «поставщик/потребитель», которые мы осудили ранее (ConcurrentStack<T>, ConcurrentQueue<T> и

ConcurrentBag<T>) возвращается false, если коллекция пуста. В некоторых сценариях полезно ожидать до тех пор, пока элемент не появится.

Вместо добавления соответствующей перегруженной версии метода TryTake с этой функциональностью (что приведет к резкому скачку количества членов после добавления отмены и таймаутов), разработчики PFX решили инкапсулировать эту функциональность в отдельном классе под названием BlockingCollection<T>. Этот класс оборачивает любую коллекцию, которая реализует интерфейс IProducerConsumerCollection<T>, и позволяет получать элемент из нижележащей коллекции, блокируя выполнение, если такого элемента нет.

Блокирующая коллекция также позволяет вам ограничить общий размер коллекции, блокируя поставщика, в случае превышения этого размера. Коллекция, ограниченная таким образом называется ограниченной блокирующей коллекцией (bounded blocking collection).

Для использования BlockingCollection<T>:

  1. Создайте экземпляр класса, указав при необходимости коллекцию, реализующую интерфейс IProducerConsumerCollection<T> и максимальный размер (границы) коллекции.
  2. Вызовите методы Add или TryAdd для добавления элементов в нижележащую коллекцию.
  3. Вызовите методы Take или TryTake для удаления (потребления) элементов нижележащей коллекции.

Если при вызове конструктора вы не передадите коллекцию, автоматически будет создан экземпляр класса ConcurrentQueue<T>. Методы добавления и удаления элементов позволяют указать маркеры отмены и таймауты. Методы Add и TryAdd могут блокировать выполнение в случае ограничения размера коллекции; методы Take и TryTake блокируют выполнение, если коллекция пуста.

Другим способом получения элементов коллекции является вызов метода GetConsumingEnumerable. Этот метод возвращает (потенциально) бесконечную последовательность, которая возвращает элементы, когда они становятся доступными. Вы можете принудительно завершить последовательность путем вызова метода CompleteAdding: этот метод также предотвращает последующие добавления элементов в очередь.

Мы уже написали очередь типа поставщик/потребитель, используя методы Wait и Pulse. Вот тот же самый класс, переработанный с использованием BlockingCollection<T> (обработка исключений опущена):

public class PCQueue : IDisposable
{
  BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
  public PCQueue (int workerCount)
  {
    // Создаем и запускаем независимую задачу для каждого потребителя:
    for (int i = 0; i < workerCount; i++)
      Task.Factory.StartNew (Consume);
  }
 
  public void Dispose() { _taskQ.CompleteAdding(); }
 
  public void EnqueueTask (Action action) { _taskQ.Add (action); }
 
  void Consume()
  {
    // Перечисляемая последовательность будет блокирована при отсутствии элементов
    // и завершится после вызова метода CompleteAdding.
    foreach (Action action in _taskQ.GetConsumingEnumerable())
action();     // Выполнение задачи.
  }
}

Поскольку мы ничего не передали в конструктор BlockingCollection, он автоматически создал экземпляр параллельной очереди. Если бы мы передали экземпляр ConcurrentStack, мы бы получили стек типа поставщик/потребитель.

Класс BlockingCollection также содержит статические методы AddToAny и TakeFromAny, которые позволяют вам добавлять или получать элемент из нескольких блокируемых коллекций. При этом действие выполняется на первой коллекции, способной обработать этот запрос.

Использование TaskCompletionSource

Класс типа поставщик/потребитель, написанный только что, является не гибким в том плане, что мы не можем отслеживать состояние элемента, после его добавления в коллекцию. Было бы здорово, если бы мы могли:

  • Знать, когда элемент обработан.
  • Отменить элемент, обработка которого еще не началась.
  • Удобно обрабатывать любые исключения, генерируемые при обработке элемента.

Идеальным решением было бы наличие метода EnqueueTask, который бы возвращал объект, предоставляющий всю описанную выше функциональность. Хорошая новость в том, что такой класс уже существует, и делает в точности то, что мы только что описали, этим классом является класс Task. Все, что нам нужно – это перехватить управление задачей с помощью TaskCompletionSource:

public class PCQueue : IDisposable
{
  class WorkItem
  {
    public readonly TaskCompletionSource<object> TaskSource;
    public readonly Action Action;
    public readonly CancellationToken? CancelToken;
 
    public WorkItem (
      TaskCompletionSource<object> taskSource,
      Action action,
      CancellationToken? cancelToken)
    {
      TaskSource = taskSource;
      Action = action;
      CancelToken = cancelToken;
    }
  }
 
  BlockingCollection<WorkItem> _taskQ = new BlockingCollection<WorkItem>();
 
  public PCQueue (int workerCount)
  {
    // Create and start a separate Task for each consumer:
    for (int i = 0; i < workerCount; i++)
      Task.Factory.StartNew (Consume);
  }
 
  public void Dispose() { _taskQ.CompleteAdding(); }
 
  public Task EnqueueTask (Action action)
  {
    return EnqueueTask (action, null);
  }
 
  public Task EnqueueTask (Action action, CancellationToken? cancelToken)
  {

    var tcs = new TaskCompletionSource<object>();
    _taskQ.Add (new WorkItem (tcs, action, cancelToken));
    return tcs.Task;
  }
 
  void Consume()
  {
    foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable())
      if (workItem.CancelToken.HasValue &&
          workItem.CancelToken.Value.IsCancellationRequested)
      {
        workItem.TaskSource.SetCanceled();
      }
      else
        try
        {
          workItem.Action();
          workItem.TaskSource.SetResult (null);   // Указываем завершение
        }
        catch (Exception ex)
        {
          workItem.TaskSource.SetException (ex);
        }
  }
}

В методе EnqueueTask мы добавляем в очередь рабочий элемент, который инкапсулирует в себе делегат и объект класса TaskCompletionSource, который в свою очередь позволяет позднее управлять задачей, которая будет возвращена из этого метода.

В методе Consume мы вначале проверяем не отменена ли задача после удаления рабочего элемента. Если нет, мы запускаем делегат и затем вызываем метод SetResult объекта TaskCompletionSource для указания завершения выполнения задачи.

Вот как мы можем использовать этот класс:

var pcQ = new PCQueue (1);
Task task = pcQ.EnqueueTask (() => Console.WriteLine ("Easy!"));
...

Теперь мы можем ожидать завершения задачи task, выполнять продолжения для этой задачи, пробрасывать исключения для продолжений родительской задачи и т.д. Другими словами, мы получили богатство модели задач, реализовав фактически свой собственный планировщик.

SpinLock и SpinWait

В мире параллельного программирования использование спин-примитивов всегда предпочтительнее использования блокировок, поскольку это устраняет накладные расходы на переключение контекста и переходы в режим ядра. SpinLock и SpinWait разработаны специально для того, чтобы помочь в таких случаях. В основном они применяются для разработки собственных конструкций синхронизации.

ПРЕДУПРЕЖДЕНИЕ
SpinLock и SpinWait являются структурами, а не классами! Это решение является серьезной оптимизацией для избегания косвенных накладных расходов сборщика мусора. Это значит, что вы должны быть внимательными и, например, не создать случайно копии экземпляров, путем передачи их в другой метод без модификатора ref или объявления их как readonly полей. Это особенно важно в случае применения SpinLock.

SpinLock

Структура SpinLock позволяет вам заблокировать выполнение, не подвергаясь накладным расходам на переключение контекста, путем того, что поток будет крутиться в бесконечном цикле (spinning) (занят бесполезной работой). Этот подход разумен в сценариях с высокой конкуренцией, когда блокировка будет захватываться на очень короткий промежуток времени (например, при написании потокобезопасного связного списка с нуля).

ПРИМЕЧАНИЕ
Если спин-блокировка будет продолжаться слишком длительное время (мы говорим максимум о миллисекундах), это израсходует отведенный потоку квант времени и приведет к переключению контекста, как и при обычной блокировке. Когда поток получит управление, снова произойдет переключение контекста с помощью цикла «ожидания/переключения» (“spin-yielding”)* Это расходует значительно меньше ресурсов процессора посравнению с обычной спин-блокировкой, но больше по сравнению с обычной блокировкой потока.
На машине с одноядерным процессором в случае ожидания спин-блокировки, переключение контекста произойдет сразу же.

* В русскоязычной литературе нет устойчивого и звучного термина, аналогичного понятию “spin-yielding”. По-сути, это модифицированная версия спин-блокировки (т.е. цикла), внутри которого содержится вызов метода Thread.Yield или Thread.Sleep(10). – Примеч. перев.

Использование структуры SpinLock аналогично использованию обычной блокировки (конструкции lock) за исключением следующего:

  • Спин-блокировки являются структурами (как уже упоминалось ранее).
  • Спин-блокировки не поддерживают реентерабельность (reentrance), это значит, что вы не можете вызвать метод Enter одного и того же объекта SpinLock дважды подряд в одном и том же потоке. Нарушение этого правила приведет либо к генерации и исключения (если включено отслеживание владельца (owner tracking)) или к взаимоблокировке (deadlock) (если отслеживание владельца отключено). Вы можете включить отслеживание владельца при создании объекта спин-блокировки. Отслеживание владельца негативно сказывается на производительности.
  • SpinLock позволяет узнать, захвачена блокировка или нет с помощью свойства IsHeld, и включено ли отслеживание владельца с помощью свойства IsheldByCurrentThread.
  • Язык C# не содержит синтаксического сахара для SpinLock, аналогичного оператору lock.

Другим отличием является то, что при вызове метода Enter вы должны следовать шаблону надежной передачи аргумента lockTaken (что практически всегда реализуется с помощью блока try/finally).

Вот пример:

var spinLock = new SpinLock (true);   // Включаем отслеживание владельца
bool lockTaken = false;
try
{
  spinLock.Enter (ref lockTaken);
  // Выполняем некоторые операции...
}
finally
{
  if (lockTaken) spinLock.Exit();
}

Как и при использовании обычной блокировки, значение lockTaken после вызова метода Enter будет равным false только в том случае, если метод сгенерирует исключение и блокировка не будет захвачена. Это происходит в очень редких случаях (таких как вызов метода Abort текущего потока или генерации исключения OutOfMemoryException) и позволяет вам точно знать, нужен ли последующий вызов метода Exit.

Структура SpinLock содержит также метод TryEnter, который принимает тайм-аут в качестве параметра.

ПРИМЕЧАНИЕ
Может показаться, что семантика значимых типов и отсутствие поддержки языка для спин-блокировок говорит о том, что разработчики этой конструкции хотят, чтобы вы тратили дополнительное время каждый раз при их использовании! Но вам следует хорошо подумать, прежде чем отказываться от
обычной блокировки.

Наиболее разумным сценарием применения SpinLock является создание ваших собственных конструкций синхронизации. И даже тогда спин-блокировки не являются такими полезными, как может показаться. Они все еще ограничивают параллелизм. И они понапрасну расходуют ресурсы процессора, не выполняя никакой полезной работы. Обычно лучше потратить часть этого времени с пользой с помощью SpinWait.

SpinWait

SpinWait помогает в разработке lock-free кода, который крутится в цикле (spins), а не блокируется. Это осуществляется путем реализации особы мер, предотвращающих ресурсное голодание (resource starvation) и инверсию приоритета (priority inversion), которые в противном случае могут возникнуть в случае ожидания в цикле (spinning).

ПРЕДУПРЕЖДЕНИЕ
Lock-free программирование с помощью SpinWait – это одна из наиболее сложных техник в многопоточном программировании, которая используется тогда, когда никакие другие более высокоуровневые конструкции не способны решить поставленную задачу. Предварительным условием для использования этой конструкции является понимание принципов
неблокирующей синхронизации.

Зачем нам нужно использовать SpinWait

Предположим, мы написали систему уведомления исключительно с помощью простого флага:

bool _proceed;
void Test()
{
  // Крутимся в цикле, пока другой поток не установит _proceed в true:
  while (!_proceed) Thread.MemoryBarrier();
  ...
}

Этот код будет очень эффективным, если при запуске функции Test флаг _proceed уже будет установлен в true или _proceed станет равным true спустя несколько циклов после этого. Но давайте предположим, что значение _procceed будет равно false в течение нескольких секунд и что метод Test будет вызван одновременно четырьмя потоками. В таком случае ожидание в цикле загрузит все четыре ядра четырехядерного процессора! Это приведет к замедлению работы других потоков (вследствие ресурсного голодания) включая выполнение того потока, который в конечном счете установит значение _proceed в true (инверсия приоритета). Ситуация обостряется еще сильнее на одноядерных компьютерах, где ожидание в цикле практически всегда приводит к инверсии приоритета. (И хотя сегодня одноядерные компьютеры становятся редкостью, одноядерные виртуальные машины – нет.)

Структура SpinWait решает эту проблему двумя путями. Во-первых, она ограничивает ресурсоемкие циклы до определенного количества итераций, после истечения которых квант времени, выделенный текущему потоку прерывается на каждой последующей итерации (путем вызова метода Thread.Yield и Thread.Sleep), снижая тем самым потребление ресурсов. Во-вторых, она определяет, происходит ли выполнение на одноядерном компьютере, и если это так, то прерывает выполнение каждого цикла.

Как использовать SpinWait

Существует два способа использования структуры SpinWait. Первый способ – использовать статический метод SpinUntil. Этот метод принимает предикат (и таймаут в качестве необязательного параметра).

bool _proceed;
void Test()
{
  SpinWait.SpinUntil (() => { Thread.MemoryBarrier(); return _proceed; });
  ...
}

Другим (более гибким) способом использования SpinWait является создание экземпляра структуры с последующим вызовом SpinOnce в цикле:

bool _proceed;
void Test()
{
  var spinWait = new SpinWait();
  while (!_proceed) { Thread.MemoryBarrier(); spinWait.SpinOnce(); }
  ...
}
Как работает SpinWait

В текущей реализации SpinWait выполняет 10 итераций, прежде чем произойдет переключение контекста. Однако управление не возвращается немедленно после каждого такого цикла: вместо этого вызывается метод Thread.SpinWait для ожидания с помощью CLR (и, в конечном итоге, операционной системы) на указанный период времени. Изначально этот период составляет несколько десятков наносекунд, но он увеличивается вдвое, на каждой итерации, пока не будет выполнено 10 итераций. Это дает некоторую предсказуемость того, какое количество времени будет потрачено в процессороемкой фазе цикла, и CLR или операционная система может изменять это время в зависимости от некоторых условий. Обычно этот период составляет несколько десятков микросекунд, это не много, но больше, чем затраты на переключение контекста.

На одноядерном компьютере SpinWait переключает контекст выполнения на каждой итерации. Вы можете проверить, будет ли SpinWait прерывать выполнение потока при следующем вызове с помощью свойства NextSpinWillYield.

Если SpinWait будет оставаться в режиме «ожидания/переключения» (“spin-yielding”) достаточно долго (порядка 20 циклов), то он периодически начнет вызвать Thread.Sleep на несколько миллисекунд, для еще большего сберегания ресурсов, что поможет выполнять полезную работу другим потокам.

Lock-free обновления с помощью SpinWait и Interlocked.CompareExchange

SpinWait совместно с Interlocked.CompareExchange может применяться для атомарного обновления полей значениями, вычисляемыми на основе предыдущего значения (read-modify-write). Например, предположим мы хотим умножить значение поля x на 10. Такой простой способ не является потокобезопасным:

x = x * 10;

по той же причине, что и инкремент поля не является потокобезопасной операцией, как мы видели в разделе Неблокирующая синхронизация.

Корректный способ выполнение этой операции без блокировок следующий:

  1. Получить копию значения x в локальную переменную.
  2. Вычислить новое значение (в данном случае путем умножения нашей локальной копии на 10).
  3. Записать вычисленное значение назад, если значение переменной за это время не изменилось (этот шаг должен быть выполнен атомарно путем вызова Interlocked.CompareExchange).
  4. Если значение локальной переменной устарело, выполнить ожидание в цикле (spin) и перейти к шагу 1.

Например:

int x;
 
void MultiplyXBy (int factor)
{
  var spinWait = new SpinWait();
  while (true)
  {
    int snapshot1 = x;
    Thread.MemoryBarrier();
    int calc = snapshot1 * factor;
    int snapshot2 = Interlocked.CompareExchange (ref x, calc, snapshot1);
    if (snapshot1 == snapshot2) return;   // Нас никто не опередил.
    spinWait.SpinOnce();
  }
}

ПРИМЕЧАНИЕ
Мы может улучшить производительность (немного) убрав вызов функции Thread.MemoryBarrier. Этот вызов необязательный, поскольку метод CompareExchange в любом случае создает барьер памяти, так что худшее, что может произойти в этом случае – это дополнительный цикл ожидания, если snapshot1 получит старое значение на первой итерации.

Метод Interlocked.CompareExchange обновляет значение поля указанным значением, если текущее значение поля совпадает со значением третьего аргумента. Затем он возвращает предыдущее значение поля, так что вы можете проверить успешность обновления, снова сравнив это значение с исходной временной переменной. Если эти значения различаются, то другой поток вас опередил, в таком случае вы ожидаете в цикле (spin) и пробуете снова.

Метод CompareExchange перегружен, чтобы работать также и с типом object. Вы можете воспользоваться этой перегрузкой, чтобы реализовать lock-free обновление любых ссылочных типов:

static void LockFreeUpdate<T> (ref T field, Func <T, T> updateFunction)
  where T : class
{
  var spinWait = new SpinWait();
  while (true)
  {
    T snapshot1 = field;
    T calc = updateFunction (snapshot1);
    T snapshot2 = Interlocked.CompareExchange (ref field, calc, snapshot1);
    if (snapshot1 == snapshot2) return;
    spinWait.SpinOnce();
  }
}

Вот как мы можем использовать этот метод для потокобезопасной работы с событиями без использования блокировок (кстати, именно так компилятор C# 4.0 реализует события по умолчанию):

EventHandler _someDelegate;
public event EventHandler SomeEvent
{
  add    { LockFreeUpdate (ref _someDelegate, d => d + value); }
  remove { LockFreeUpdate (ref _someDelegate, d => d - value); }
}

ПРИМЕЧАНИЕ: SpinWait vs SpinLock
Мы можем решить эту проблему, обернув доступ к разделяемому полю с помощью SpinLock. Однако проблема со спин-блокировкой заключается в том, что она позволяет продолжить выполнение только одному потоку за раз, хотя и устраняет (обычно) накладные расходы на переключение контекста. С помощью SpinWait мы можем продолжить выполнение, предполагая отсутствие конкуренции. Если значение в процессе этого изменится, мы просто попробуем выполнить эту же операцию еще раз. Лучше тратить ресурсы процессора на что-то, что может выполниться, нежели тратить время процессора на спин-блокировку!

В заключение, давайте рассмотрим следующий класс:

class Test
{
  ProgressStatus _status = new ProgressStatus (0, "Starting");
 
  class ProgressStatus    // Класс неизменяемый
  {
    public readonly int PercentComplete;
    public readonly string StatusMessage;
 
    public ProgressStatus (int percentComplete, string statusMessage)
    {
      PercentComplete = percentComplete;
      StatusMessage = statusMessage;
    }
  }
}

Мы можем использовать наш метод LockFreeUpdate для «инкремента» поля _status класса PercentComplete следующим образом:

LockFreeUpdate (ref _status,
  s => new ProgressStatus (s.PercentComplete + 1, s.StatusMessage));

Обратите внимание, что мы создаем новый объект класса ProgressStatus на основе существующего значения. Благодаря методу LockFreeUpdate, процесс чтения существующего значения объекта PercentComplete, увеличение его на единицу и запись этого значения обратно, не может быть прерван небезопасным образом: любые изменения надежным образом отслеживаются и обрабатываются путем ожидания в цикле и повторного выполнения.

5 комментариев:

  1. ParallelLoopSztate: ранний выход из циклов

    ОтветитьУдалить
  2. А зачем коммутативность? Конкатенация односвязных списков не является коммутативной операцией, но можно заставить несколько тредов пробежаться до концов списков параллельно их связать.

    Какой смысл в seedFactory если нету возможности получить хоть какую-либо информацию о chunk-е который собирается обрабатываться? Т.е. я вижу логическую полезность этого только в перемножении векторов, но только если есть информация о том с какого элемента начинается chunk. А в случае с частотами - это особенность C#, где массив нужно создавать. А вот, если бы использовалась структурка с 26-ю field-ами...

    По поводу SpinLock. Правильно ли предположение, что где-то при старте или еще где делается замер времени итераций и потом при синхронизации часть времени проводится в контексте процесса, а часть вне его по средствам принудительного переключения. Конечно, Sleep может и делает подобное, но схема предложения переключения в ядро может быть немножко умнее, мне кажется.

    А что такое Thread.MemoryBarrier ? Описание на MSDN'е очень смутное и похоже что это не настоящая функция а какой-то маркер для JIT/AOT компилятора.

    ОтветитьУдалить