пятница, 3 сентября 2010 г.

[Перевод] Джозеф Албахари. Часть 5.1. Параллельное программирование

Это перевод пятой части статьи Джозефа Албахари (Joseph Albahari) о работе с потоками в C# - “Part 5: Parallel Programming”. Статья целиком очень большая (более 50 страниц), поэтому перевод разбит на несколько частей.

Часть 5: Параллельное программирование

В этом разделе мы рассмотрим новые API для многопоточного программирования, появившиеся в .Net Framework 4.0 повышающие эффективность  использования многоядерных процессоров:

Эти библиотеки широко известны (неформально) как PFX (Parallel Framework). Класс Parallel совместно с конструкциями для параллелизма задач (task parallelism constructs) называются Task Parallel Library или TPL.

В Framework 4.0 также добавлен ряд низкоуровневых конструкций, которые будут также полезны и при обычной работе с многопоточностью. Мы рассмотрели их ранее:

Вы должны понимать фундаментальные принципы, рассмотренные в частях 1-4, в особенности понятия блокировки и безопасности потоков, прежде чем приступить к чтению этого раздела.

Все исходные коды из этого раздела доступны в виде интерактивных примеров на LINQPad. Чтобы получить доступ к этим примерам кликните на Download More Samples на вкладке LINQPad Samples, слева вверху и выберите C# 4.0 in a Nutshell: More Chapters.

Почему PFX?

В последнее время скорость центральных процессоров перестала стремительно расти и производители сосредоточились на увеличении количества ядер. И это является проблемой для нас, как программистов, поскольку при этом наш код, предназначенный для работы в однопоточной среде, не начинает работать быстрее автоматически.

Воспользоваться преимуществами нескольких ядер относительно просто для серверных приложений, где каждый поток может независимо обрабатывать отдельный запрос от клиента, но этого значительно сложнее добиться для клиентских приложений, поскольку в этом случае обычно потребуется преобразовать ваш код, интенсивно использующий вычисления , следующим образом:

  1. Секционировать (partition) его на мелкий фрагменты.
  2. Выполнить каждый фрагмент параллельно в разных потоках.
  3. После окончания выполнения объединить результаты потокобезопасным и эффективным способом.

И хотя вы можете сделать все это с помощью классических конструкций для работы с многопоточностью, такое решение будет неуклюжим, особенно шаги секционирования и объединения результатов. Дополнительной проблемой является то, что эти конструкции для обеспечения потокобезопасности используют типичную стратегию блокировок, которая при одновременной работе нескольких потоков с одними и теми же данными приводит к высокой конкуренции.

Библиотека PFX специально разработана, чтобы помочь разработчику в таких случаях.

Параллельное программирование (parallel programming) – это техника программирования, которая использует преимущества многоядерных или многопроцессорных компьютеров и является подмножеством более широкого понятия многопоточности (multithreading).

Концепции библиотеки PFX

Существует две стратегии разделения работы между потоками: параллелизм данных (data parallelism) и параллелизм задач (task parallelism).

Когда определенный набор задач должен быть выполнен на большом объеме данных, мы можем распараллелить его таким образом, чтобы каждый поток выполнял (одинаковый) набор задач на подмножестве значений. Этот подход называется параллелизмом данных, поскольку мы разбиваем данные между потоками. В противоположность этому, при параллелизме задач мы разбиваем задачи, и в таком случае каждый поток выполняет разную задачу.

В общем случае, параллелизм данных проще и лучше масштабируется на высокопроизводительном оборудовании, поскольку уменьшает или полностью устраняет разделяемые данные (таким образом уменьшая конкурентный доступ и другие проблемы безопасности потоков). Кроме того, параллелизм данных основывается на том факте , что данных обычно значительно больше, чем дискретных задач, что увеличивает потенциал параллельной обработки .

Параллелизм данных также способствует структурному параллелизму (structured parallelism), что означает, что модули параллельной работы начинают и заканчивают работу в одном и том же месте вашей программы. В противоположность этому, параллелизм задач ведет к неструктурированному параллелизму, что означает, что места, в которых модули начинают и заканчивают свою работу, могут быть разбросаны по всему вашему приложению. Структурный параллелизм проще и менее подвержен ошибкам, и позволяет вам возложить всю сложную работу по секционированию данных и координации потоков (и даже объединению результатов) на библиотеки.

Компоненты библиотеки PFX

PFX включает в себя два уровня функциональности. Верхний уровень состоит из двух библиотек для структурного параллелизма данных: PLINQ и класс Parallel. Нижний уровень содержит классы для параллелизма задач – плюс дополнительный набор конструкций, упрощающих решение задач параллелельного программирования.

 image

Библиотека PLINQ предоставляет самую богатую функциональность: она автоматизирует все этапы параллелизма, включая разделение работы на задачи, выполнение этих задач различными потоками и объединение результатов в одну выходную последовательность. Ее использование называется декларативным (declarative), поскольку вы просто объявляете (declare) то, что вы хотите выполнить параллельно (подобно запросам LINQ), и позволяете ей заботиться о деталях реализации. В противоположность этому, другие подходы являются императивными, в этом случае вам нужно явно написать код по разделению задачи и объединению результатов. В случае класса Parallel, вы должны объединить результаты самостоятельно; в случае конструкций параллелизма задач, вы должны еще и разделить задачу самостоятельно:

  Разделение задачи Объединение результатов
PLINQ Да Да
Класс Parallel Да Нет
Параллелизм задач из PFX Нет Нет

Параллельные коллекции (concurrent collections) и спин-примитивы (spinning primitives) помогают в решении низкоуровневых задач параллельного программирования, что является очень важным, поскольку PFX проектировалась для работы не только на современном оборудовании, но и с будущими поколениями процессоров со значительно большим количеством ядер. Если вы хотите перетащить кучу срубленных деревьев и в вашем распоряжении есть 32 человека, самым сложным является координация людей таким образом, чтобы они не путались друг у друга под ногами . Тоже самое касается распределения алгоритма между 32 ядрами: при использовании обычных блокировок для защиты общих ресурсов, результирующая блокировка ресурса может привести к тому, что только часть ядер будет на самом деле занята одновременно. Параллельные коллекции оптимизированы специально для высококонкурентного доступа и сосредоточены на минимизации или устранении блокировок. PLINQ и класс Parallel в частности, основываются на параллельных коллекциях и спин-примитивах для эффективной реализации своей работы.

ПРИМЕЧАНИЕ. PFX и классическая многопоточность
Классические сценарии работы с многопоточностью полезны даже на одноядерных компьютерах, даже при отсутствии реального распараллеливания. Эти сценарии мы рассмотрели ранее: они включают в себя задачи по созданию отзывчивого пользовательского интерфейса и загрузке двух веб-страниц одновременно.
Некоторые конструкции, рассмотренные в разделе параллельного программирования, также иногда применяются в классических задача многопоточности. В частности:­ ­

Когда использовать PFX

Основной сценарий использования PFX – параллельное программирование: повышение эффективности использования многоядерных процессоров для ускорения выполнения кода с большим количеством вычислений.

Самая большая сложность в эффективном использовании многоядерных процессоров заключается в законе Амдала, который заключается в том, что максимальное повышение эффективности за счет распараллеливания ограничено кодом, который должен вычисляться последовательно. Например, если только две трети времени выполнения алгоритма могут быть распараллелены, тогда вы никогда не получите троекратного прироста производительности, даже при бесконечном количестве ядер.

Так что, прежде чем продолжать, стоит проверить, находится ли узкое место в распараллеливаемом коде. Кроме того, стоит подумать о том, должен ли ваш код выполнять такое количество вычислений; оптимизация обычно является самым простым и эффективным подходом. Хотя здесь может потребоваться компромисс, поскольку использование некоторых техник оптимизации может усложнить распараллеливание кода.

Наиболее просто получить выгоду при решении так называемых задач чрезвычайного параллелизма (embarrassingly parallel problems) – когда вся задача может быть легко разбита на подзадачи, которые могут выполняться сами по себе (структурный параллелизм хорошо подходит под этот круг задач). Примерами могут служить задачи обработки изображений, трассировка лучей, подходы на основе «грубой силы» (brute force) в математике или криптографии. Примером задач, не относящихся к этой категории, является реализация оптимизированной версии алгоритма быстрой сортировки, для получения хороших результатов нужно хорошо постараться и в этом случае понадобится неструктурный параллелизм .

PLINQ

PLINQ автоматически распараллеливает локальные LINQ запросы. PLINQ легко использовать, поскольку разбиение задачи и объединения результатов возлагается на него.

Для использования PLINQ, просто вызовите метод AsParallel() на входной последовательности и затем продолжайте, как обычно использовать LINQ запросы. Следующий запрос вычисляет простые числа от 3 до 100,00 – используя все ядра процессора:

// Вычислить простые числа, используя простой (неоптимальный) алгоритм.
 
IEnumerable<int> numbers = Enumerable.Range (3, 100000-3);
 
var parallelQuery =
  from n in numbers.AsParallel()
  where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
  select n;
 
int[] primes = parallelQuery.ToArray();

AsParallel – это метод расширения класса System.Linq.ParallelEnumerable. Он оборачивает входные данные в последовательность, построенную на основе ParallelQuery<TSource>, что приводит к тому, что операторы запросов LINQ будут вызывать альтернативный набор методов расширения, определенные в классе PrallelEnumerable. В этом классе определены параллельные реализации всех стандартных операторов запроса, которые, по сути, разбивают входную последовательность на части, каждая из которых обрабатывается разными потоками, объединяя затем результаты в одну выходную последовательность:

image 

Вызов метода AsSequential() разворачивает последовательность таким образом, чтобы последующие операторы вызывали стандартные операторы запросов и выполнялись последовательно. Это необходимо перед вызовов методов со сторонними эффектами или потоконебезопасных методов.

mySequence.AsParallel()           // //Оборачивает последовательность в ParallelQuery<int> 
  ParallelQuery<int>
          .Where (n => n > 100)   //Результатом является еще одна последовательность ParallelQuery<int>
          .AsParallel()          //Нет необходимости – это неэффективно!
          .Select (n => n * n)

Не все операторы запросов могут эффективно распараллеливаться. Те операторы, для которых это невозможно, реализуются в PLINQ последовательно . PLINQ может также выполнить оператор последовательно, если он подозревает, что накладные расходы на параллелизм на самом деле замедлят выполнение запроса.

PLINQ применим только к локальным коллекциям: он не работает с LINQ to SQL или с Entity Framework, поскольку в этих случаях LINQ запросы преобразуются в SQL, которые затем выполняются на сервере баз данных. Однако вы можете использовать PLINQ для выполнения дополнительных локальных запросов на результатах, полученных запросами к базе данных.

ПРЕДУПРЕЖДЕНИЕ
Если в процессе выполнения запроса генерируется исключение, то PLINQ перебрасывает AggregateException, в свойстве InnerExceptions которого будет содержаться настоящее исключение (или исключения). Подробнее см. раздел Работа с AggregateException.

ПРИМЕЧАНИЕ. Почему AsParallel не используется по умолчанию?
Поскольку AsParallel прозрачно распараллеливает запросы LINQ, возникает вопрос: «Почему Майкрософт просто не распараллеливает стандартные операторы запросов и не использует PLINQ по умолчанию?»
Существует несколько причин, почему этот подход не используется по умолчанию. Во-первых, польза от использования PLINQ будет только при наличии значительного количества вычислительных задач, распределенных по рабочим потокам. Многие запросы LINQ to Objects выполняются очень быстро, и распараллеливание не только не нужно, но накладные расходы на разделение данных, объединения результатов и координацию дополнительных потоков могут, на самом деле только ухудшить производительность.
Кроме того:

  • результат работы PLINQ запроса (по умолчанию) могут отличаться от результатов выполнения LINQ запроса порядком выходных элементов.
  • PLINQ заворачивает исключения в AggregateException (для обработки нескольких возможных сгенерированных исключений).
  • PLINQ дает ненадежные результаты, если запросы вызывают потоконебезопасные методы.
  • и, в конце концов, PLINQ предоставляет слишком мало возможностей для оптимизации и тонкой настройки. Обременяя стандартные API LINQ to Objects такими нюансами приведет к беде.
Баллистика параллельного выполнения

Как и обычные LINQ запросы, PLINQ запросы выполняются отложено. Это означает, что выполнение начнется только тогда, когда вы начнете использовать результаты запроса – обычно в цикле foreach (также это может произойти в операторе преобразования, таком как ToArray или операторе, возвращающем единственный элемент или значение).

Однако когда вы перебираете результаты запроса, вычисление несколько отличается от того, что происходит при выполнении обычного запроса. Последовательный запрос полностью управляется вызывающим кодом “pull”-образом: каждый элементы входной последовательности извлекается только по требованию вызывающего кода. Параллельный запрос обычно использует независимые потоки для извлечения элементов из входной последовательности несколько опережая потребности вызывающего кода (как телесуфлер для комментатора или «анти-шок» системы в CD-проигрывателях). Затем он обрабатывает элементы параллельно с помощью цепочки запросов (query chain), сохраняя результаты в маленьком буфере, которые доступны оттуда вызывающему коду по требованию. Если вызывающий код приостанавливает или прекращает выполнение до перебора всех элементов результата, обработчик запроса также приостанавливает или прекращает свое выполнение, чтобы не расходовать понапрасну память и процессорное время.

Вы можете изменить поведение буферизации PLINQ путем вызова метода WithMergeOption после вызова метода AsParallel. Значение по умолчанию – AutoBuffered – в целом дает наилучший результат. NoBuffered отключает буферизацию и полезно, когда вы хотите получить результат как можно скорее; FullyBuffered полностью кэширует результаты, прежде чем вернуть его вызывающему коду (операторы Ordery и Reverse работают, естественно, именно так, как и операторы получения элемента, агрегации или преобразования).

PLINQ и сортировка

Побочным эффектом параллельного выполнения операторов запросов является то, что при объединении результатов порядок элементов не обязательно совпадает с исходным, что и показано на предыдущей диаграмме. Другими словами, гарантии соблюдения порядка следования элементов последовательности, которые выполняются операторами LINQ, не выполняются операторами PLINQ.

Если вам нужно сохранить порядок элементов, вам нужно сделать это явно путем вызова метода AsOrdered() после вызова AsParallel():

myCollection.AsParallel().AsOrdered()...

Вызов метода AsOrdered на последовательностях большого размера снижает производительность, поскольку в этом случае необходимо сохранять исходную позицию элементов.

Вы можете нивелировать снижение эффективности вызова AsOrdered путем последующего вызова AsUnordered: это создает определенную точку, после которой запрос может выполняться эффективнее. Так что, если вы хотите сохранить порядок следования элементов входной последовательности только для первых двух операторов, вы можете поступить таким образом:

inputSequence.AsParallel().AsOrdered()
  .QueryOperator1()
  .QueryOperator2()
  .AsUnordered()       // С этого места порядок уже не имеет значения
  .QueryOperator3()
  ...

Оператор AsOrdered не применяется по умолчанию, поскольку для большинства запросов не важен исходный порядок входной последовательности. Другими словами, если бы AsOrdered применялся по умолчанию, то для повышения эффективности вам бы пришлось вызывать AsUnordered для большинства своих параллельных запросов, что было бы слишком обременительным.

Ограничения PLINQ

На данный момент существует ряд практических ограничений на то, что PLINQ может распараллеливать. Эти ограничения могут быть ослаблены после выпусков сервис паков или будущих версий .Net Framework.

Следующие операторы запросов предотвращают распараллеливание запросов, если исходные элементы не находятся в своих исходных позициях:

  • Take, TakeWhile, Skip и SkipWhile
  • Индексные версии методов Select, SelectMany и ElementAt

Большинство операторов изменяют индексную позицию элементов (включая операторы, удаляющие элементы, такие как Where). Это значит, что если вы хотите использовать эти операторы, то они должны идти вначале запроса.

Следующие операторы являются распараллеливаемыми, но они используют дорогую стратегию секционирования (partitioning stategy), что иногда может привести к менее эффективному результату, нежели последовательная обработка.

  • Join, GroupBy, GroupJoin, Distinct, Union, Intersect, and Except
  • Join, GroupBy, GroupJoin, Distinct, Union, Intersect и Except

Версия оператора Aggregate, принимающая начальное значение в своем стандартном виде не может быть распараллелена, для этого PLINQ предоставляет специальную перегрузку.

Все остальные операторы являются распараллеливаемыми, хотя их применение и не гарантирует то, что ваш запрос будет распараллелен. PLINQ может выполнить ваш запрос последовательно если определит, что накладные расходы на распараллеливание приведут к замедлению выполнения этого запроса. Вы можете изменить это поведение и принудительно заставить запрос выполняться параллельно путем вызова следующего метода после AsParallel():

.WithExecutionMode (ParallelExecutionMode.ForceParallelism)
Пример: параллельная проверка правописания

Предположим мы хотим реализовать проверку правописания, которая будет быстро выполняться с очень большими документами и будет использовать все доступные ядра процессора. Выражая свой алгоритм с помощью LINQ запроса, мы можем очень легко распараллелить эту задачу.

Для начала нужно загрузить словарь английских слов в HashSet для эффективного поиска:

if (!File.Exists ("WordLookup.txt"))    //Содержит порядка 150,000 слов
  new WebClient().DownloadFile (
    "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");
 
var wordLookup = new HashSet<string> (
  File.ReadAllLines ("WordLookup.txt"),
  StringComparer.InvariantCultureIgnoreCase);

Затем мы воспользуемся этим контейнером для создания тестового «документа», состоящего из миллиона случайных слов. После создания этого документа мы добавим в него несколько ошибок:

var random = new Random();
string[] wordList = wordLookup.ToArray();
 
string[] wordsToTest = Enumerable.Range (0, 1000000)
  .Select (i => wordList [random.Next (0, wordList.Length)])
  .ToArray();
 
wordsToTest [12345] = "woozsh";     //Добавляю несколько
wordsToTest [23456] = "wubsie";     //ошибок.

Теперь мы можем выполнить параллельную проверку правописания путем тестирования wordsToTest с помощью wordLookup. PLINQ делает это с легкостью:

var query = wordsToTest
  .AsParallel()
  .Select  ((word, index) => new IndexedWord { Word=word, Index=index })
  .Where   (iword => !wordLookup.Contains (iword.Word))
  .OrderBy (iword => iword.Index);
 
query.Dump();     //Отобразить результаты в LINQPad

Вот вывод, полученный с помощью LINQPad:

OrderedParallelQuery<IndexedWord> (2 элемента)
Word Index
woozsh 12345
wubsie 23456

IndexedWord – это структура, определенная следующим образом:

struct IndexedWord { public string Word; public int Index; }

Метод wordLookup.Contains, используемый в предикате, делает разумным распараллеливание этого запроса.

Мы можем немного упростить этот запрос путем использования анонимного типа вместо структуры IndexedWord. Однако, это негативно скажется на производительности, поскольку анонимные типы (будучи классами и, таким образом, ссылочными типами) влекут за собой выделение памяти в управляемой куче и последующую сборку мусора.

Эта разница может быть не заметна при выполнении запросов последовательно, но при параллельном выполнении запросов, использование стековых переменных может быть более эффективным. Это связано с тем, что использование стековых переменных легко распараллеливается (поскольку у каждого потока есть свой стек), в то время как все потоки должны состязаться за использование одной кучи, управляемой одним менеджером памяти и сборщиком мусора.

Использование ThreadLocal<T>

Давайте расширим наш пример и распараллелим создание нашего тестового списка. Мы создаем этот список с помощью LINQ запроса, значит это должно быть просто. Вот последовательная версия:

string[] wordsToTest = Enumerable.Range (0, 1000000)
  .Select (i => wordList [random.Next (0, wordList.Length)])
  .ToArray();

К сожалению, вызов метода random.Next не является потокобезопасным, так что наше решение не сводится к простому вызову AsParallel существующего запроса. Возможным решением является создание функции, которая будет использовать блокировку вокруг вызова random.Next; однако, это ограничит параллелизм. Более подходящим решением является использование ThreadLocal<Random> для создания отдельного объекта Random для каждого потока. Тогда мы сможем распараллелить запрос следующим образом:

var localRandom = new ThreadLocal<Random>
( () => new Random (Guid.NewGuid().GetHashCode()) );
 
string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel()
  .Select (i => wordList [localRandom.Value.Next (0, wordList.Length)])
  .ToArray();

Наша фабричная функция по созданию объекта Random передает хэш-код Guid-а для гарантии того, что два объекта Random, созданные за короткий промежуток времени, будут генерировать разные случайные последовательности.

ПРИМЕЧАНИЕ. Когда использовать PLINQ
Весьма соблазнительно, найти LINQ запросы в своих существующих приложениях и поэкспериментировать с их распараллеливанием. В большинстве случаев от этого не будет никакого толку, поскольку большинство задач, для которых LINQ оказывается налучшим решением, выполняются очень быстро и, таким образом, не получат никакой выгоды от распараллеливания. Лучше найти узкое место в блоке с большим количеством вычислений и ответить на такой вопрос: «Можно ли это представить в виде LINQ запроса?» (Положительным побочным эффектом такого преобразования является то, что обычно LINQ запросы занимают меньше места и более читабельны).
PLINQ хорошо подходит для задач чрезвычайного параллелизма (embarrassingly parallel problems). Он также хорошо походит для структурных блоков, таких как вызов нескольких сервисов одновременно (см. раздел
Вызов блокирующих функций или функций с интенсивным ввода/вывода).
PLINQ является плохим выбором для работы с фотографиями, поскольку узким местом будет объединение миллионов точек в одну выходную последовательность. Вместо этого, лучше сохранить точки напрямую в массив или блок неуправляемой памяти и использовать класс
Parallel или параллелизм задач для работы с многопоточностью. (Хотя возможно решить проблему объединения результатов путем вызова метода ForAll. Это может иметь смысл, если алгоритм обработки изображения сам привел к использованию LINQ).

Функциональная чистота

Поскольку PLINQ выполняет ваш запрос в параллельных потоках, вы должны заботиться о том, чтобы не выполнять никаких потоконебезопасных операций. В частности, изменение переменных является побочным эффектом, а потому не является потокобезопасным:

// Этот запрос умножает каждый элемент на значение его позиции.
// Если передать на вход Enumerable.Range(0, 999), то результат должен быть возведен в квадрат.
int i = 0;
var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++;

Мы можем сделать увеличение i на единицу потокобезопасной операцией путем использования блокировок или Interlocked операций, но проблема все еще будет в том, что i не обязательно будет соответствовать позиции элемента во входной последовательности. И вызов AsOrdered не исправит проблему, поскольку AsOrdered гарантирует только то, что элементы выходной последовательности будут располагаться аналогично, если бы входная последовательность обрабатывалась последовательно, но это не означает, что входная последовательность будет на самом деле обработана последовательно.

Вместо этого, этот запрос нужно переписать с использованием функции Select, принимающей индекс в качестве параметра:

var query = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i);

Ради повышения производительности, любой метод, вызываемый из операторов запроса должны быть потокобезопасными в том смысле, что он не должен изменять поля или свойства (отсутствие сторонних эффектов, или функциональная чистота). Если эти методы потокобезопасны в том смысле, что они используют блокировки, потенциал параллелизма этого запроса будет ограничен, и это время будет пропорционально продолжительности захвата блокировки, деленное на суммарное время, проведенное внутри этой функции.

Вызов блокирующих функций или функций с интенсивным вводом/выводом

Иногда запросы выполняются длительное время не из-за интенсивных операций центрального процессора, а потому что процессор ждет чего-то, например, загрузки web-страниц или ответа от оборудования. PLINQ может эффективно распараллеливать такие запросы, если вы скажете ему об этом путем вызова метода WithDegreeOfParallelism после вызова метода AsParallel. Например, предположим мы хотим проверить существование шести веб-сайтов одновременно. Вместо того чтобы использовать асинхронные делегаты или вручную запускать шесть потоков, мы можем сделать это элементарно с помощью PLINQ запроса:

from site in new[]
{
  "www.albahari.com",
  "www.linqpad.net",
  "www.oreilly.com",
  "www.google.com",
  "www.takeonit.com",
  "stackoverflow.com"
}
.AsParallel().WithDegreeOfParallelism(6)
let p = new Ping().Send (site)
select new
{
  site,
  Result = p.Status,
  Time = p.RoundtripTime
}

Вызов метода WithDegreeOfParallelism заставляет PLINQ запускать указанное количество задач одновременно. Это необходимо, когда вы вызываете блокирующие функции, такие как Ping.Send, поскольку в противном случае PLINQ предполагает, что запрос выполняет задачи с высокой нагрузкой на центральный процессор и создает задачи соответствующим образом. Например, на двуядерном компьютере PLINQ по умолчанию может запускать только две задачи одновременно, что в данном случае является явно не желательным.

PLINQ обычно обрабатывает задачу в потоке, полученному из пула потоков. Вы можете увеличить минимальное количество потоков путем вызова функции ThreadPool.SetMinThreads.

В качестве другого примера давайте предположим, что мы пишем систему видеонаблюдения и хотим постоянно объединять изображение с четырех камер наблюдения в одно составное изображения для отображения его на мониторе. Мы можем представить камеру в виде следующего класса:

class Camera
{
  public readonly int CameraID;
  public Camera (int cameraID) { CameraID = cameraID; }
 
  // Получить изображение с камеры: возвращает простую строку, а не изображение
  public string GetNextFrame()
  {
    Thread.Sleep (123);       // Симулируем время для получения снимка
    return "Frame from camera " + CameraID;
  }
}

Для получения составного изображения мы должны вызвать метода GetNextFrame у каждого из четырех объектов камер. Предполагая, что это потребует больше количество операций ввода/вывода, мы можем вчетверо увеличить частоту кадров с помощью параллелизма – даже на одноядерной машине. PLINQ позволяет добиться этого с минимальными усилиями со стороны разработчика:

Camera[] cameras = Enumerable.Range (0, 4)    // Создаем 4 объекта камеры
  .Select (i => new Camera (i))
  .ToArray();
 
while (true)
{
  string[] data = cameras
    .AsParallel().AsOrdered().WithDegreeOfParallelism (4)
    .Select (c => c.GetNextFrame()).ToArray();
 
  Console.WriteLine (string.Join (", ", data));   // Отображаем данные...
}

Метод GetNextFrame блокирует ход выполнения программы, поэтому мы вызываем WithDegreeOfparallelism для получения необходимого уровня параллелизма. В нашем примере, блокирование происходит при вызове метода Sleep; в реальной жизни блокирование будет происходить из-за того, что получение изображения с камеры требует скорее интенсивного ввода/вывода, чем интенсивной загрузки процессора.

Вызов метода AsOrdered гарантирует, что изображения будут отображены в нужной последовательности. Поскольку в последовательности всего четыре элемента, это не окажет никакого влияния на производительность.

Изменение степени параллелизма

Вы можете вызвать метод WithDegreeOfParallelism только одни раз в PLINQ запросе. Если вам нужно вызвать этот метод снова, вы должны принудительно объединить результаты, а затем снова распараллелить запрос путем вызова метода AsParallel:

"The Quick Brown Fox"
  .AsParallel().WithDegreeOfParallelism (2)
  .Where (c => !char.IsWhiteSpace (c))
  .AsParallel().WithDegreeOfParallelism (3)   // Принудительно объединяем результаты и распараллеливаем заново
  .Select (c => char.ToUpper (c))

Отмена операций

Отменить выполнение PLINQ запроса, результаты которого обрабатываются внутри цикла foreach, очень просто: прервите выполнение цикла foreach и запрос будет отменен автоматически во время неявного вызова метода Dispose енумератора.

Запрос, который использует операторы преобразования, получения элементов или агрегации, может быть отменен из другого потока с помощью маркера отмены (cancellation token). Для передачи маркера отмены, вызовите метод WithCancellation после вызова AsParallal, и передайте в свойстве Token объект типа CancellationTokenSource. Затем другой поток может вызвать метод Cancel этого маркера, который сгенерирует OperationCanceledException в коде, использующем результаты запроса:

IEnumerable<int> million = Enumerable.Range (3, 1000000);
 
var cancelSource = new CancellationTokenSource();
var primeNumberQuery =
  from n in million.AsParallel().WithCancellation (cancelSource.Token)
  where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
  select n;
 
new Thread (() => {
                    Thread.Sleep (100);      // Отменить запрос по истечению
                    cancelSource.Cancel();   // 100 миллисекунд.
                  }
           ).Start();
try 
{
  // Запуск выполнения запроса:
  int[] primes = primeNumberQuery.ToArray();
  // Мы никогда сюда не попадем, поскольку другой поток отменит выполнение
}
catch (OperationCanceledException)
{
  Console.WriteLine ("Query canceled");
}

PLINQ не прерывает работу потоков, поскольку это опасно. Вместо этого, после отмены, прежде чем завершить выполнение запроса, он ждет окончания выполнения каждого рабочего потока, обрабатывающего текущий элемент. Это значит, что выполнение любых внешних методов, вызванных при выполнении запроса, будет завершено.

Оптимизация PLINQ

Оптимизация со стороны выходной последовательности

Одно из преимуществ использования PLINQ заключается в простоте объединения результатов параллельной обработки выходной последовательности. Однако иногда все, что вам требуется, это выполнить некоторую функцию для каждого элемента:

foreach (int n in parallelQuery)
  DoSomething (n);

В этом случае, если вас не заботит порядок обработки элементов, вы можете повысить эффективность выполнения запроса путем использования метода ForAll.

Метод ForAll выполняет делегат для каждого выходного элемента ParallelQuery. Он изменяет внутреннее поведение обработки PLINQ запросов, обходя этапы объединения и перечисления результатов. Вот простой пример:

"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);
image 

Объединение и перечисление результатов не является очень уж дорогой операцией, поэтому использование ForAll дает значительные преимущества только при большом количестве элементов, обработка которых выполняется очень быстро.

Оптимизация со стороны входной последовательности

PLINQ может использовать одну из трех стратегий секционирования (partitioning strategy), для назначения элементов входной последовательности рабочим потокам:

Стратегия Выделение элементов
Относительная производительность
Секционирование блоками (Chunk partitioning) Динамическое Средняя
Секционирование по диапазону (Range partitioning) Статическое От плохой до отличной
Хеш-секционирование (Hash partitioning) Статическое Плохая

Для операторов запросов, которые требуют сравнение элементов (GroupBy, Join, GroupJoin, Intersect, Except, Union, and Distinct), у вас нет выбора: PLINQ всегда использует хеш-секционирование (hash partitioning). Хеш-секционирование относительно менее эффективно, поскольку требует предварительного расчета хеш-значения каждого элемента последовательности (таким образом, элементы с одинаковым хеш-значением могут быть обработаны одним и тем же потоком). Если этот вариант вам покажется слишком медленным, у вас будет единственная возможность – это вызвать метод AsSequential для отключения распараллеливания.

Для всех остальных операторов запроса у вас есть выбор, использовать секционирование по диапазону или блоками. По умолчанию:

  • Если входная последовательность является индексируемой (если это массив или коллекция реализует интерфейс IList<T>), PLINQ будет использовать секционирование по диапазону.
  • В противном случае PLINQ будет использовать секционирование блоками.

Если кратко, то секционирование по диапазону быстрее на больших последовательностях, для которых обработка каждого элемента требует одинакового количества процессорного времени. В противном случае, секционирование блоками более эффективно.

Для принудительного использования секционирования по диапазону:

  • Если запрос начинается с вызова метода Enumerable.Range, замените его на ParallelEnumerable.Range
  • В противном случае, просто вызовите метод ToList или ToArray на входной последовательности (но вы должны понимать, что это, естественно, негативно скажется на производительности).

ParallelEnumerable.Range – это не просто краткая запись для Enumerable.Range(...).AsParallel(). Вызов этого метода влияет на производительность, поскольку в этом случае будет использоваться секционирование по диапазону.

Для принудительного использования секционирования блоками – оберните входную последовательность путем вызова метода Partitioner.Create (расположенного в пространстве имен System.Collection.Concurrent) следующим образом:

int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };
var parallelQuery =
  Partitioner.Create (numbers, true).AsParallel()
  .Where (...)

Второй аргумент метода Partitioner.Create указывает на то, что вы хотите выровнять загрузку (load-balance) запроса, другими словами, вы хотите использовать секционирование блоками.

Секционирование блоками работает следующим образом: рабочий поток периодически берет небольшой «блок» элементов из входной последовательности для обработки. PLINQ начинает с выделения блоков очень маленького размера (состоящих из одного-двух элементов), затем увеличивает размер блока по мере обработки запроса: это гарантирует то, что короткие последовательности распараллеливаются эффективным образом, а длинные последовательности не страдают от чрезмерных накладных расходов. Если рабочему потоку попались «простые» в обработке элементы (которые он быстро обработал), это приведет к получению дополнительных блоков. Это позволяет системе поддерживать потоки равномерно нагруженными (и балансировать нагрузку ядер процессора); единственным недостатком этого метода является использование синхронизации при получении элементов из разделяемой входной последовательности (обычно, с помощью эксклюзивной блокировки), что может привести к некоторым накладным расходам и конкуренции за общий ресурс.

image

 
Секционирование по диапазону обходит обычный процесс перебора входной последовательности и заранее выделяет одинаковое количество элементов для каждого рабочего потока, избегая конкуренции за входную последовательность. Но если одному из потоков достанутся более «простые» элементы и он закончит свое выполнение раньше, он будет бездействовать, пока остальные потоки продолжают свою работу. Алгоритм расчета простых чисел, рассмотренный ранее, может показать плохую производительность при использовании секционирования на основе диапазона. Примером, когда этот тип секционирования может работать эффективно, является вычисление суммы квадратных корней первых десяти миллионов целых чисел:

ParallelEnumerable.Range (1, 10000000).Sum (i => Math.Sqrt (i))

ParallelEnumerable.Range возвращает ParallelQuery<T>, поэтому последующий вызов AsParallel не нужен.

Секционирование на основе диапазона не обязательно выделяет диапазоны непрерывными блоками, вместо этого оно может использовать стратегию на основе фрагментов (“striping” strategy). Например, при наличии двух рабочих потоков, один из них может обрабатывать четные элементы, в то время как другой поток будет обрабатывать нечетные. Оператор TakeWhile практически всегда прибегает к использованию этой стратегии, поскольку он будет стараться избежать обработки ненужных элементов в конце последовательности.

четверг, 26 августа 2010 г.

[Перевод] Джозеф Албахари. Работа с потоками в C#. Часть 4

Это перевод четвертой части “Part 4. Advanced Threading” из замечательной серии статей Джозефа Албахари (Joseph Albahari)  о работе с потоками в языке C# и, как и при переводе третьей части, сейчас я публикую только, что появилось с выходом нового издания книги  “C# 4.0 in Nutshell: The Definitive Reference” и еще не опубликовано на русском языке на rsdn.ru.

Класс Barrier

Класс Barrier – это сигнальная конструкция, которая появилась в .Net Framework 4.0. Он реализует барьер потока исполнения, который позволяет множеству потоков встречаться в определенном месте во времени. Этот класс очень быстрый и эффективный, поскольку построен на основе Wait, Pulse и спин-блокировок.

Для использования этого класса необходимо:

  1. Создать экземпляр, указав количество потоков, которые будут встречаться одновременно(вы можете изменить это значение позднее путем вызова методов AddParticipants/RemoveParticipants).

Создание экземпляра класса Barrier со значением 3 приведет к тому, что вызов метода SignalAndWait блокируется до тех пор, пока этот метод не будет вызван трижды. Но, в отличие от CountdownEvent, он автоматически запускает этот процесс заново: вызов метода SignalAndWait снова блокируется до тех пор, пока он не вызовется трижды. Это позволяет идти нескольким потокам «в ногу» во время обработки набора задач.

clip_image001

В следующем примере, каждый из трех потоков выводит числа от 0 до 4, шагая в ногу с другими потоками:

static Barrier _barrier = new Barrier (3);
 
static void Main()
{
  new Thread (Speak).Start();
  new Thread (Speak).Start();
  new Thread (Speak).Start();
}
 
static void Speak()
{
  for (int i = 0; i < 5; i++)
  {
    Console.Write (i + " ");
    _barrier.SignalAndWait();
  }
}
0 0 0 1 1 1 2 2 2 3 3 3 4 4 4

Очень полезной возможностью класса Barrier является возможность указать при его конструировании постдействие (post-phase action). Этот делегат выполняется после того, как метод SignalAndWait вызван n раз, но перед тем, как потоки будут разблокированы. Если в нашем примере мы создадим экземпляр класса барьера следующим образом:

static Barrier _barrier = new Barrier (3, barrier => Console.WriteLine());

то вывод на экран будет таким:

0 0 0 
1 1 1
2 2 2
3 3 3
4 4 4

Постдействие может быть полезным при объединении данных рабочих потоков. В этом случае не нужно беспокоиться о порядке выполнения, поскольку до окончания выполнения этого действия все рабочие потоки остаются заблокированными.

Блокировки чтения/записи

Довольно часто экземпляры некоторых типов являются потокобезопасными при параллельных операциях чтениях, но не являются таковыми при параллельных обновлениях (при параллельных операциях чтения и обновления). Это также относится к таким ресурсам, как файлы. И хотя защита экземпляров таких типов простой эксклюзивной блокировкой для всех режимов доступа, решает проблему параллельного доступа, это может необоснованно ограничить параллелизм, когда к экземпляру обращаются много читателей при небольшом количестве обновлений. Типичным примером, когда может возникнуть такая ситуация – это сервер бизнес-приложений, в котором часто используемые данные кэшируются в статических полях для последующего быстрого доступа. Класс ReaderWriteLockSlim разработан для обеспечения максимальной доступности ресурса именно для этого случая.

Класс ReaderWriterLockSlim появился в .Net Framework 3.5 и предназначен для замены старого «тяжеловесного» класса ReaderWriterLock. Этот класс обладает аналогичной функциональности, но он в несколько раз медленнее и содержит ошибки проектирования в механизме обработки обновления блокировки.

Оба эти класса предусматривают два типа блокировки – блокировка чтения и блокировка записи:

  • Блокировка записи является полностью эксклюзивной.
  • Блокировка чтения совместима с другими блокировками чтения.

So, a thread holding a write lock blocks all other threads trying to obtain a read or write lock (and vice versa). But if no thread holds a write lock, any number of threads may concurrently obtain a read lock.

ReaderWriterLockSlim defines the following methods for obtaining and releasing read/write locks:

Т.е. поток, захвативший блокировку записи, блокирует все другие потоки, пытающиеся получить блокировку чтения или записи (и наоборот). Но если ни один поток не держит блокировку записи, любое количество потоков могут одновременно захватить блокировку чтения.

В классе ReaderWriterLockSLim определены следующие методы для захвата и освобождения блокировок чтения/записи:

public void EnterReadLock();
public void ExitReadLock();
public void EnterWriteLock();
public void ExitWriteLock();

В дополнение к этим методом, существуют также “Try” версии всех EnterXXX методов, которые принимают тайм-аут в качестве параметра, аналогично методу Monitor.TryEnter (таймауты могут часто возникать при работе с высокой степенью конкуренции). Класс ReaderWriterLock содержит аналогичные методы с именами AcquireXXX и ReleaseXXX, которые в случае возникновения тайм-аута генерируют ApplicationException, а не возвращают false.

Следующий код демонстрирует использование класса ReaderWriterLockSlim. Три потока постоянно читают список, в то время как два других потока добавляют случайное значение в него каждую секунду. Блокировка чтения защищает читателей списка, а блокировка записи защищает писателей.

class SlimDemo
{
    static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
    static List<int> _items = new List<int>();
    static Random _rand = new Random();

    static void Main()
    {
        new Thread(Read).Start();
        new Thread(Read).Start();
        new Thread(Read).Start();

        new Thread(Write).Start("A");
        new Thread(Write).Start("B");
    }

    static void Read()
    {
        while (true)
        {
            _rw.EnterReadLock();
            foreach (int i in _items) Thread.Sleep(10);
            _rw.ExitReadLock();
        }
    }

    static void Write(object threadID)
    {
        while (true)
        {
            int newNumber = GetRandNum(100);
            _rw.EnterWriteLock();
            _items.Add(newNumber);
            _rw.ExitWriteLock();
            Console.WriteLine("Thread " + threadID + " added " + newNumber);
            Thread.Sleep(100);
        }
    }

    static int GetRandNum(int max) { lock (_rand) return _rand.Next(max); }
}

В промышленном коде вам следует добавить блоки try/finally для гарантии освобождения блокировок при генерации исключения.

Вот результат выполнения:

Thread B added 61
Thread A added 83
Thread B added 55
Thread A added 33
...

Класс ReaderWriterLockSlim позволяет выполнять больше параллельных операций чтения, нежели обычная блокировка. Мы можем продемонстрировать это путем добавления следующей строки в метод Write в начале цикла while:

Console.WriteLine (_rw.CurrentReadCount + " concurrent readers");

Этот код практически всегда выводит “3 concurrent readers” (методы Read проводят основное время выполнения внутри циклов foreach). Помимо свойства ConcurrentReadCount класс ReaderWriterLockSlim предоставляет следующие свойства для мониторинга блокировок:

public bool IsReadLockHeld            { get; }
public bool IsUpgradeableReadLockHeld { get; }
public bool IsWriteLockHeld           { get; }
 
public int  WaitingReadCount          { get; }
public int  WaitingUpgradeCount       { get; }
public int  WaitingWriteCount         { get; }
 
public int  RecursiveReadCount        { get; }
public int  RecursiveUpgradeCount     { get; }
public int  RecursiveWriteCount       { get; }
Обновляемые блокировки и рекурсия

Иногда бывает полезным переключиться с блокировки чтения на блокировку записи одной атомарной операцией. Например, предположим, вы хотите добавить элемент в список, только если этого элемента там еще нет. В идеале, вы хотите минимизировать время удержания эксклюзивной блокировки (блокировки записи), и поступить следующим образом:

  1. Получить блокировку чтения
  2. Проверить, если элемент уже находится в списке, освободить блокировку и завершить выполнение функции
  3. Освободить блокировку чтения
  4. Захватить блокировку записи
  5. Добавить элемент

Проблема в том, что другой поток может вклиниться и модифицировать список (например, добавить такой же элемент) между шагами 3 и 4. ReaderWriterLockSlim решает эту задачу с помощью третьего типа блокировки, называемой обновляемой блокировкой (upgradeable lock). Обновляемая блокировка аналогична блокировке чтения за исключением того, что позднее с помощью атомарной операции она может быть расширена до блокировки записи. Вот как ею пользоваться:

  1. Вызвать метод EnterUpgradeableReadLock
  2. Выполнить операции, требующие доступ на чтение (например, проверить находится ли некоторый элемент в списке).
  3. Вызвать EnterWriteLock (это преобразует обновляемую блокировку в блокировку записи).
  4. Выполнить операции, требующие доступ на запись (например, добавить элемент в список).
  5. Вызвать ExitWriteLock (это преобразует блокировку записи обратно к обновляемой блокировке).
  6. Выполнить любые другие операции, требующие доступ на чтение
  7. Вызвать ExitUpgradeableReadLock.

С точки зрения вызывающего кода это очень похоже на вложенную или рекурсивную блокировку. Но функционально, на третьем шаге, ReaderWriteLockSlim освобождает блокировку чтения и автоматически получает новую блокировку записи.

Существует еще одно важное отличие между обновляемыми блокировками и блокировками чтения. В то время как обновляемая блокировка может совместно существовать с любым количеством блокировок чтения, в один момент времени может быть получена только одна обновляемая блокировка. Это предотвращает взаимоблокировку при преобразовании блокировок во время сериализации нескольких одновременных обновляемых блокировок, аналогично тому, как обновляются блокировки в SQL Server:

SQL Server

ReaderWriterLockSlim

Разделяемая блокировка

Блокировка на чтение

Эксклюзивная блокировка

Блокировка на запись

Обновляемая блокировка

Обновляемая блокировка

Мы можем показать использование обновляемой блокировки, модифицировав метод Write предыдущего примера таким образом, чтобы добавлять значение в список только тогда, когда его там еще нет:

while (true)
{
  int newNumber = GetRandNum (100);
  _rw.EnterUpgradeableReadLock();
  if (!_items.Contains (newNumber))
  {
    _rw.EnterWriteLock();
    _items.Add (newNumber);
    _rw.ExitWriteLock();
    Console.WriteLine ("Thread " + threadID + " added " + newNumber);
  }
  _rw.ExitUpgradeableReadLock();
  Thread.Sleep (100);
}

Класс ReaderWriterLock также поддерживает преобразование блокировок, но процесс этот ненадежен, поскольку он не поддерживает концепцию обновляемых блокировок. Вот почему разработчикам ReaderWriteLockSlim пришлось создавать полностью новый класс.

Рекурсия блокировок

Обычно, вложенные или рекурсивные блокировки с классом ReaderWriteLockSlim запрещены. Вот почему следующий код генерирует исключение:

var rw = new ReaderWriterLockSlim();
rw.EnterReadLock();
rw.EnterReadLock();
rw.ExitReadLock();
rw.ExitReadLock();

Однако этот код выполняется без ошибок, если создать объект ReaderWriteLockSlim следующим образом:

var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);

Это гарантирует то, что рекурсивная блокировка может произойти только тогда, когда вы это планировали. Рекурсивная блокировка может добавить нежелательные сложности, поскольку возможно получение более одного типа блокировок:

rw.EnterWriteLock();
rw.EnterReadLock();
Console.WriteLine (rw.IsReadLockHeld);     // True
Console.WriteLine (rw.IsWriteLockHeld);    // True
rw.ExitReadLock();
rw.ExitWriteLock();

Базовое правило состоит в следующем: если вы уже захватили блокировку, последующая рекурсивная блокировка может быть менее, а не более сильной, согласно следующей шкале:

      Блокировка чтения  Обновляемая блокировка  Блокировка записи

Однако запрос на преобразование обновляемой блокировки к блокировке записи, всегда является корректным.

понедельник, 23 августа 2010 г.

[Перевод] Джозеф Албахари. Работа с потоками в C#. Часть 3

Не так давно вышло новое издание замечательной книги Джозефа Албахари (Josepth Albahari) “C# 4.0 in Nutshell: The Definitive Reference”, которая, как подсказывает название, обновилась с учетом нововведений, появившихся в последней версии языка программирования C# и платформы .Net. Еще до перевода этой книги на русский язык, одна из самых интересных и полезных частей книги – о многопоточном программировании на языке C# – была переведена Алексеем Кирюшкиным (a.k.a. Odi$$ey) и опубликована на сайте rsdn.ru (Часть 1 и Часть 2).

В связи с тем, что в новой версии .Net Framework появилось много новых полезных конструкций, призванных упростить работу с многопоточностью, эта часть книги была существенно расширена и переработана. Так, во-первых, добавилась новая часть “Part 5. Parallel Programming”, а во-вторых, части 3 и 4 подверглись существенным переработкам.

С любезного разрешения автора я решил продолжить работу, начатую Алексеем и перевести на русский язык 5-ю часть, а также те изменения, которые появились в частях предыдущих. Я не хочу здесь заново публиковать то, что уже опубликовано на русский язык (хотя, я думаю, что в скором времени полную версию предыдущих частей можно будет почитать на rsdn.ru), и оставлю здесь только те новшества, которые появились в новой версии .Net Framework. Так что, если вы уже знакомы с конструкциями многопоточности в предыдущих версиях платформы, это позволит вам не тратить лишнее время на то, что вы и так уже знаете, ну, а если нет – то лучше начать с чтения предыдущего издания этих частей, опубликованных на rsdn.ru.

Часть 3. Работа с потоками

Безопасная отмена операций

Как мы видели в предыдущем разделе, вызов метода Abort в большинстве случаев является опасным. В таком случае, альтернативой является реализация кооперативного паттерна(cooperative pattern), когда рабочий поток периодически проверяет флаг, указывающий на необходимость прекращения его работы (как в классе BackgroundWorker). Для отмены операции, ее инициатор просто устанавливает этот флаг, и затем ожидает завершения выполнения рабочего потока. Вспомогательный класс BackgroundWorker реализует этот паттерн отмены на основе флага (flag-based cancelation pattern), и вы легко можете реализовать его самостоятельно.

Очевидным недостатком этого метода является то, что рабочий метод должен быть написан специально с поддержкой отмены выполнения. Но, несмотря на это, это один из немногих безопасных паттернов отмены. Для демонстрации этого паттерна, мы сначала напишем класс, инкапсулирующий флаг отмены.

class RulyCanceler
{
    object _cancelLocker = new object();
    bool _cancelRequest;
    public bool IsCancellationRequested
    {
        get { lock (_cancelLocker) return _cancelRequest; }
    }

    public void Cancel() { lock (_cancelLocker) _cancelRequest = true; }

    public void ThrowIfCancellationRequested()
    {
        if (IsCancellationRequested) throw new OperationCanceledException();
    }
}

Класс OperationCanceledException – это класс в составе .NET Framework предназначенный именно для этих целей. Хотя для этих целей подойдет и любой другой класс исключения.

Мы можем использовать его следующим образом:

class Test
{
    static void Main()
    {
        var canceler = new RulyCanceler();
        new Thread(() =>
        {
            try { Work(canceler); }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Canceled!");
            }
        }).Start();
        Thread.Sleep(1000);
        canceler.Cancel();               // безопасная отмена.
    }

    static void Work(RulyCanceler c)
    {
        while (true)
        {
            c.ThrowIfCancellationRequested();
            // ...
            try { OtherMethod(c); }
            finally { /* необходимая очистка ресурсов */ }
        }
    }

    static void OtherMethod(RulyCanceler c)
    {
        // Выполняем некоторые действия ...
        c.ThrowIfCancellationRequested();
    }
}

Мы можем упростить наш пример, убрав класс RulyCanceler и добавив вместо него в классе Test статическое булево поле _cancelRequest. Однако, в случае исполнения метода Work несколькими потоками одновременно, установка флага _cancelRequest в true прервет выполнение всех рабочих потоков. Так что класс RulyCanceler является полезной абстракцией. Недостатком этого подхода является то, что глядя на сигнатуру метода Work не сразу понятно его предназначение:

static void Work (RulyCanceler c)
Может ли метод Cancel объекта RulyCanceler быть вызван из самого метода Work? В данном случае, ответ – нет, так что было бы хорошо, если бы система типов поддерживала бы это. .Net Framework 4.0 для этой цели предоставляет маркеры отмены (cancelation tokens).
Маркеры отмены

В .Net Framework 4.0 существует два типа, формализующие этот только что продемонстрированный кооперативный паттерн отмены: CancellationTokenSource и CancellationToken. Эти два типа работают в тандеме:

  • В классе CancellationTokenSource определен метод Cancel
  • В классе CancellationToken определено свойство IsCancellationRequested и метод ThrowIfCancellationRequested

Вместе, эти классы являются более сложной версией класса RulyCanceler, представленного в предыдущем примере. Но поскольку типы разделены, вы можете разделить возможность отмены операции от возможности проверки флага отмены.

Для использования этих типов, создайте вначале экземпляр класса CancellationTokenSource:

var cancelSource = new CancellationTokenSource();

Затем, передайте свойство Token этого объекта в метод, поддерживающий отмену:

new Thread (() => Work (cancelSource.Token)).Start();

Вот как будет определен метод Work:

void Work (CancellationToken cancelToken)
{
  cancelToken.ThrowIfCancellationRequested();
  ...
}

Когда вы хотите отменить выполнение метода, просто вызовите метод Cancel объекта cancelSource.

CancellationToken на самом деле является структурой, хотя ведет себя как класс. При копировании, копия ведет себя точно также и указывает на исходный объект CancellationTokenSource.

В структуре CancellationToken определены два дополнительных полезных метода. Первый – это WaitHandle, возвращающий дескриптор ожидания (wait handle), который переходит в сигнальное состояние при отмене. Второй – Register, который позволяет зарегистрировать метод обратного вызова и который будет вызван при отмене.

Маркеры отмены используются в самом .NET Framework, в частности, в следующих классах:

Большинство этих классов используют маркеры отмены в методах Wait. Например, если вы вызовете метод Wait объекта класса ManualResetEventSlim и укажете маркер отмены, другой класс сможет вызвать метод Cancel и отменить ожидание. Этот метод чище и безопаснее, нежели вызывать метод Interrupt заблокированного потока.

Отложенная инициализация

Типичной проблемой в многопоточной среде является отложенная инициализация разделяемого (shared) поля потокобезопасным способом. Такая потребность возникает, когда создание экземпляра некоторого типа является дорогостоящим:

class Foo
{
  public readonly Expensive Expensive = new Expensive();
  ...
}
class Expensive/* Предположим, что создание этого объекта является дорогим */ }

Проблема с этим кодом в том, что создание экземпляра класса Foo приводит к затратам на  создания экземпляра класса Expensive не зависимо от того, используется это поле или нет. Очевидным решением этой задачи является создание экземпляра по требованию:

class Foo
{
  Expensive _expensive;
  public Expensive Expensive       // Создать экземпляр класса Expensive отложенно
  {
    get
    {
      if (_expensive == null) _expensive = new Expensive();
      return _expensive;
    }
  }
  ...
}

В таком случае возникает вопрос: является ли этот код потокобезопасным? Даже не смотря на тот факт, что доступ к полю _expensive вне блока lock без использования барьеров памяти, давайте подумаем о том, что будет, если два потока обратятся к этому свойству одновременно. При выполнении каждого потока условие if будет выполнено и каждый поток получит разные экземпляры класса Expensive. Это может привести к тонким ошибкам, так что, в общем случае мы можем сказать, что этот код не является потокобезопасный.

Решение проблемы состоит в использовании блокировки вокруг проверки и инициализации объекта:

Expensive _expensive;
readonly object _expenseLock = new object();
 
public Expensive Expensive
{
  get
  {
    lock (_expenseLock)
    {
      if (_expensive == null) _expensive = new Expensive();
      return _expensive;
    }
  }
}
Lazy<T>

В .Net Framework 4.0 существует новый класс, под названием Lazy<T> для упрощения отложенной инициализации. Если объект этого класса будет создан с аргументом, равным true, он будет реализовывать только что описанный паттерн инициализации.

На самом деле класс Lazy<T> реализует более эффективную версию этого паттерна, под названием блокировка с двойной проверкой (double-checked locking). При блокировке с двойной проверкой выполняется дополнительная операция volatile чтение для избегания захвата блокировки в случае, если объект уже проинициализирован.

Для использования класса Lazy<T>, создайте класс с делегатом[S7] -фабрикой, который будет определять способ создания нового значения, и в качестве второго параметра передайте true. Затем, вы можете использовать это значение с помощью свойства Value:

Lazy<Expensive> _expensive = new Lazy<Expensive>
  (() => new Expensive(), true);
 
public Expensive Expensive { get { return _expensive.Value; } }

Если вы передадите false в конструктор класса Lasy<T>, тогда объект этого класса будет реализовывать потоконебезопасный паттерн отложенной инициализации, рассмотренный в начале этого раздела. Это имеет смысл, если вы будете применять Lazy<T> в однопоточноном контексте.

LazyInitializer

Класс LazyInitializer – это статический класс, который работает в точности как Lazy<T> за исключением следующего:

  • Его функциональность реализована в виде статического метода, который работает непосредственно с полем внутри вашего типа. Это устраняет дополнительный уровень косвенности и улучшает производительность в случае необходимости чрезвычайной оптимизации.
  • Он предоставляет еще один способ инициализации, при котором во время инициализации возможны гонки нескольких потоков.
Expensive _expensive;
public Expensive Expensive
{
  get   // Реализует блокировку с двойной проверкой
  {
    LazyInitializer.EnsureInitialized (ref _expensive,
                                      () => new Expensive());
    return _expensive;
  }
}

Вы также можете передать другой аргумент для того, чтобы указать на возможность гонок нескольких потоков, борющихся между собой за инициализацию ресурса. Этот способ кажется аналогичным нашему исходному потоконебезопасному примеру, за исключением того, что в данном случае первый поток «всегда выиграет», так что в конечном итоге всегда будет получен только один экземпляр. Преимущество этого подхода состоит в том, что он даже быстрее (на многоядерных компьютерах) чем блокировка с двойной проверкой, поскольку он может быть полностью реализован без блокировок. Такая чрезвычайная оптимизация требуется достаточно редко и она не обходится бесплатно:

  • Этот способ медленнее, когда количество потоков, борющихся за инициализацию, превосходит количество ядер.
  • Он потенциально расходует ресурсы процессора понапрасну, выполняя избыточную инициализацию.
  • Логика инициализации должна быть потокобезопасной (например, будет потоконебезопасно, если конструктор класса Expensive будет записывать что-либо в статические поля).
  • Если инициализатор создает объект, требующий очистки ресурсов (disposable object), ресурсы «лишнего» объекта не будут освобождены без дополнительной логики.

Для справки, вот как реализовывается блокировка с двойной проверкой:

volatile Expensive _expensive;
public Expensive Expensive
{
  get
  {
    if (_expensive == null)
    {
      var expensive = new Expensive();
      lock (_expenseLock) if (_expensive == null) _expensive = expensive;
    }
    return _expensive;
  }
}

А вот как реализован способ с возможностью гонок при инициализации:

volatile Expensive _expensive;
public Expensive Expensive
{
  get
  {
    if (_expensive == null)
    {
      var instance = new Expensive();
      Interlocked.CompareExchange (ref _expensive, instance, null);
    }
    return _expensive;
  }
}
Локальное хранилище потока

В этой статье делается серьезный акцент на конструкции синхронизации и на проблемы, который возникают при одновременном доступе различных потоков к одним и тем же данным. Однако иногда вы хотите иметь изолированные данные, гарантируя, что у каждого потока будет их отдельная копия. Локальные переменные предназначены именно для этого, но они применимы только для временных данных (transient data).

Решением является использование локального хранилища потока (thread-local storage). Вы можете увидеть противоречие в этом требовании: данные, которые вы хотите изолировать для каждого отдельного потока, являются временными по своей природе. Главная цель этих хранилищ – хранение «нетиповых» (“out-of-band”) данных, которые являются частью инфраструктуры приложения, такие как сообщения, транзакции и маркеры безопасности (security tokens). Передача таких данных через параметры методов является чрезвычайно грубым решением и усложняет все, кроме этих методов; хранение же такой информации в обыкновенных статических полях означает разделение этих данных всеми потоками.

Кроме того, локальное хранилище потока может быть полезным при оптимизации параллельного кода. Это позволяет каждому потоку единолично обращаться к своей собственной версии потоконебезопасного объекта без использования блокировок и без необходимости повторного создания объекта между вызовами методов.

Существует три способа реализации локального хранилища потока.

[ThreadStatic]

Самым простым способом использования локального хранилища потока является пометка  статического поля с помощью атрибута ThreadStatic:

[ThreadStatic] static int _x;

В этом случае каждый поток будет видеть независимую копию переменной _x.

К сожалению, атрибут [ThreadStatic] не работает с экземплярными полями (он просто ничего не делает в этом случае); а также нормально не работает с инициализаторами статических полей (field initializer), поскольку они выполняются только один раз при выполнении статического конструктора. Если вам нужно работать с экземплярными полями или значениями статических полей, отличных от значений по умолчанию, более подходящим выбором будет ThreadLocal<T>.

ThreadLocal<T>

Класс ThreadLocal<T> появился в .Net Framework 4.0 и он позволяет использовать локальное хранилище потока, как для статических, так и для экземплярных полей, а также позволяет задавать значение по умолчанию.

Вот как можно создать экземпляр ThreadLocal<int> со значением по умолчанию для каждого потока, равным 3:

static ThreadLocal<int> _x = new ThreadLocal<int> (() => 3);

Затем вы можете использовать свойство Value переменной _x для получения или установки значения, локального для каждого потока. Бонусом использования ThreadLocal является отложенная инициализация значений: фабричная функция выполняется при первом вызове (для каждого потока).

ThreadLocal<T> и экземплярные поля

Класс TrheadLocal<T> также полезен при работе с экземплярными полями и захваченными локальными переменными. Например, давайте рассмотрим задачу генерации случайных чисел в многопоточном окружении. Класс Random не потокобезопасен, поэтому нам либо нужно, либо воспользоваться блокировкой вокруг использования объекта класса Random (ограничивая одновременный доступ) либо генерировать объект класса Random для каждого потока. Класс ThreadLocal<T> легко решает вторую задачу:

var localRandom = new ThreadLocal<Random>(() => new Random());
Console.WriteLine (localRandom.Value.Next());

Наша фабричная функция для создания объекта Random немного упрощенная, поскольку конструктор без параметров класса Random основывается на системном времени для начальных значений. И это значение может быть одинаковым для двух разных объектов Random созданных с интервалом порядка 10 мс. Вот как это можно исправить:

var localRandom = new ThreadLocal<Random>
( () => new Random (Guid.NewGuid().GetHashCode()) );

Мы воспользуемся этим способом в 5-й части (см. пример параллельной проверки правописания в разделе “PLINQ”).

GetData and SetData

Третий подход основывается на использовании двух методов класса Thread: GetData и SetData. Эти методы сохраняют данные в специальных «слотах» потока. Метод Thread.GetData читает из  этого хранилища потока, а метод Thread.SetData – записывает туда. Для идентификации слота оба метода требуют объект LocalDataStoreSlot. Одна и та же ячейка может быть использована всеми потоками, но при этом каждый из них будет работать со своими собственными значениями. Вот пример:

class Test
{
  // Один и тот же объект LocalDataStoreSlot может использоваться всеми потоками.
  LocalDataStoreSlot _secSlot = Thread.GetNamedDataSlot ("securityLevel");
 
  // Это свойство содержит разное значение для каждого потока
  int SecurityLevel
  {
    get
    {
      object data = Thread.GetData (_secSlot);
      return data == null ? 0 : (int) data;    // null == поле неинициализировано
    }
    set { Thread.SetData (_secSlot, value); }
  }
  ...

В этом примере, мы вызываем Thread.GetNamedDataSlot, который создает новый именованный слот , что позволяет совместно использовать этот слот всему приложению. Создавая объект LocalDataStoreSlot явно и не задавая его имени, вы можете управлять областью видимостью слота самостоятельно:

class Test
{
  LocalDataStoreSlot _secSlot = new LocalDataStoreSlot();
  ...

Вызов метода Thread.FreeNamedDataSlot освободит все именованные слоты для всех потоков, но только после того, как будут освобождены все ссылки на LocalDataStoreSlot и он будет освобожден сборщиком мусора. Это гарантирует то, что если потокам все еще нужен этот слот и они содержат ссылку на соответствующий объект LocalDataStoreSlot, то они никогда внезапно не потеряют эти данные