Это перевод пятой части статьи Джозефа Албахари (Joseph Albahari) о работе с потоками в C# - “Part 5: Parallel Programming”. Статья целиком очень большая (более 50 страниц), поэтому перевод разбит на несколько частей.
Часть 5: Параллельное программирование
В этом разделе мы рассмотрим новые API для многопоточного программирования, появившиеся в .Net Framework 4.0 повышающие эффективность использования многоядерных процессоров:
- Parallel LINQ или PLINQ
- Класс Parallel
- Конструкции для параллелизма задач
Эти библиотеки широко известны (неформально) как PFX (Parallel Framework). Класс Parallel совместно с конструкциями для параллелизма задач (task parallelism constructs) называются Task Parallel Library или TPL.
В Framework 4.0 также добавлен ряд низкоуровневых конструкций, которые будут также полезны и при обычной работе с многопоточностью. Мы рассмотрели их ранее:
- Сигнальные конструкции с низким временем ожидания (SemaphoreSlim, ManualResetEventSlim, CountdownEvent и Barrier)
- Маркеры отмены для реализации кооперативной отмены (cooperative cancellation)
- Классы для отложенной инициализации
Вы должны понимать фундаментальные принципы, рассмотренные в частях 1-4, в особенности понятия блокировки и безопасности потоков, прежде чем приступить к чтению этого раздела.
Все исходные коды из этого раздела доступны в виде интерактивных примеров на LINQPad. Чтобы получить доступ к этим примерам кликните на Download More Samples на вкладке LINQPad Samples, слева вверху и выберите C# 4.0 in a Nutshell: More Chapters.
Почему PFX?
В последнее время скорость центральных процессоров перестала стремительно расти и производители сосредоточились на увеличении количества ядер. И это является проблемой для нас, как программистов, поскольку при этом наш код, предназначенный для работы в однопоточной среде, не начинает работать быстрее автоматически.
Воспользоваться преимуществами нескольких ядер относительно просто для серверных приложений, где каждый поток может независимо обрабатывать отдельный запрос от клиента, но этого значительно сложнее добиться для клиентских приложений, поскольку в этом случае обычно потребуется преобразовать ваш код, интенсивно использующий вычисления , следующим образом:
- Секционировать (partition) его на мелкий фрагменты.
- Выполнить каждый фрагмент параллельно в разных потоках.
- После окончания выполнения объединить результаты потокобезопасным и эффективным способом.
И хотя вы можете сделать все это с помощью классических конструкций для работы с многопоточностью, такое решение будет неуклюжим, особенно шаги секционирования и объединения результатов. Дополнительной проблемой является то, что эти конструкции для обеспечения потокобезопасности используют типичную стратегию блокировок, которая при одновременной работе нескольких потоков с одними и теми же данными приводит к высокой конкуренции.
Библиотека PFX специально разработана, чтобы помочь разработчику в таких случаях.
Параллельное программирование (parallel programming) – это техника программирования, которая использует преимущества многоядерных или многопроцессорных компьютеров и является подмножеством более широкого понятия многопоточности (multithreading).
Концепции библиотеки PFX
Существует две стратегии разделения работы между потоками: параллелизм данных (data parallelism) и параллелизм задач (task parallelism).
Когда определенный набор задач должен быть выполнен на большом объеме данных, мы можем распараллелить его таким образом, чтобы каждый поток выполнял (одинаковый) набор задач на подмножестве значений. Этот подход называется параллелизмом данных, поскольку мы разбиваем данные между потоками. В противоположность этому, при параллелизме задач мы разбиваем задачи, и в таком случае каждый поток выполняет разную задачу.
В общем случае, параллелизм данных проще и лучше масштабируется на высокопроизводительном оборудовании, поскольку уменьшает или полностью устраняет разделяемые данные (таким образом уменьшая конкурентный доступ и другие проблемы безопасности потоков). Кроме того, параллелизм данных основывается на том факте , что данных обычно значительно больше, чем дискретных задач, что увеличивает потенциал параллельной обработки .
Параллелизм данных также способствует структурному параллелизму (structured parallelism), что означает, что модули параллельной работы начинают и заканчивают работу в одном и том же месте вашей программы. В противоположность этому, параллелизм задач ведет к неструктурированному параллелизму, что означает, что места, в которых модули начинают и заканчивают свою работу, могут быть разбросаны по всему вашему приложению. Структурный параллелизм проще и менее подвержен ошибкам, и позволяет вам возложить всю сложную работу по секционированию данных и координации потоков (и даже объединению результатов) на библиотеки.
Компоненты библиотеки PFX
PFX включает в себя два уровня функциональности. Верхний уровень состоит из двух библиотек для структурного параллелизма данных: PLINQ и класс Parallel. Нижний уровень содержит классы для параллелизма задач – плюс дополнительный набор конструкций, упрощающих решение задач параллелельного программирования.
Библиотека PLINQ предоставляет самую богатую функциональность: она автоматизирует все этапы параллелизма, включая разделение работы на задачи, выполнение этих задач различными потоками и объединение результатов в одну выходную последовательность. Ее использование называется декларативным (declarative), поскольку вы просто объявляете (declare) то, что вы хотите выполнить параллельно (подобно запросам LINQ), и позволяете ей заботиться о деталях реализации. В противоположность этому, другие подходы являются императивными, в этом случае вам нужно явно написать код по разделению задачи и объединению результатов. В случае класса Parallel, вы должны объединить результаты самостоятельно; в случае конструкций параллелизма задач, вы должны еще и разделить задачу самостоятельно:
Разделение задачи | Объединение результатов | |
PLINQ | Да | Да |
Класс Parallel | Да | Нет |
Параллелизм задач из PFX | Нет | Нет |
Параллельные коллекции (concurrent collections) и спин-примитивы (spinning primitives) помогают в решении низкоуровневых задач параллельного программирования, что является очень важным, поскольку PFX проектировалась для работы не только на современном оборудовании, но и с будущими поколениями процессоров со значительно большим количеством ядер. Если вы хотите перетащить кучу срубленных деревьев и в вашем распоряжении есть 32 человека, самым сложным является координация людей таким образом, чтобы они не путались друг у друга под ногами . Тоже самое касается распределения алгоритма между 32 ядрами: при использовании обычных блокировок для защиты общих ресурсов, результирующая блокировка ресурса может привести к тому, что только часть ядер будет на самом деле занята одновременно. Параллельные коллекции оптимизированы специально для высококонкурентного доступа и сосредоточены на минимизации или устранении блокировок. PLINQ и класс Parallel в частности, основываются на параллельных коллекциях и спин-примитивах для эффективной реализации своей работы.
ПРИМЕЧАНИЕ. PFX и классическая многопоточность
Классические сценарии работы с многопоточностью полезны даже на одноядерных компьютерах, даже при отсутствии реального распараллеливания. Эти сценарии мы рассмотрели ранее: они включают в себя задачи по созданию отзывчивого пользовательского интерфейса и загрузке двух веб-страниц одновременно.
Некоторые конструкции, рассмотренные в разделе параллельного программирования, также иногда применяются в классических задача многопоточности. В частности:
- Конструкции параллелизма задач полезны, когда вы хотите выполнить некоторую операцию в потоке пула потоков, а также управляете последовательностью выполняемых действий с помощью продолжений (continuations) и родительских/дочерних задач.
- Параллельные коллекции иногда полезны, когда вам нужна потокобезопасная очередь, стек или словарь.
- С помощью BlockingCollection легко реализовать структуры типа поставщик/потребитель.
Когда использовать PFX
Основной сценарий использования PFX – параллельное программирование: повышение эффективности использования многоядерных процессоров для ускорения выполнения кода с большим количеством вычислений.
Самая большая сложность в эффективном использовании многоядерных процессоров заключается в законе Амдала, который заключается в том, что максимальное повышение эффективности за счет распараллеливания ограничено кодом, который должен вычисляться последовательно. Например, если только две трети времени выполнения алгоритма могут быть распараллелены, тогда вы никогда не получите троекратного прироста производительности, даже при бесконечном количестве ядер.
Так что, прежде чем продолжать, стоит проверить, находится ли узкое место в распараллеливаемом коде. Кроме того, стоит подумать о том, должен ли ваш код выполнять такое количество вычислений; оптимизация обычно является самым простым и эффективным подходом. Хотя здесь может потребоваться компромисс, поскольку использование некоторых техник оптимизации может усложнить распараллеливание кода.
Наиболее просто получить выгоду при решении так называемых задач чрезвычайного параллелизма (embarrassingly parallel problems) – когда вся задача может быть легко разбита на подзадачи, которые могут выполняться сами по себе (структурный параллелизм хорошо подходит под этот круг задач). Примерами могут служить задачи обработки изображений, трассировка лучей, подходы на основе «грубой силы» (brute force) в математике или криптографии. Примером задач, не относящихся к этой категории, является реализация оптимизированной версии алгоритма быстрой сортировки, для получения хороших результатов нужно хорошо постараться и в этом случае понадобится неструктурный параллелизм .
PLINQ
PLINQ автоматически распараллеливает локальные LINQ запросы. PLINQ легко использовать, поскольку разбиение задачи и объединения результатов возлагается на него.
Для использования PLINQ, просто вызовите метод AsParallel() на входной последовательности и затем продолжайте, как обычно использовать LINQ запросы. Следующий запрос вычисляет простые числа от 3 до 100,00 – используя все ядра процессора:
// Вычислить простые числа, используя простой (неоптимальный) алгоритм.
IEnumerable<int> numbers = Enumerable.Range (3, 100000-3);
var parallelQuery =
from n in numbers.AsParallel()
where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
select n;
int[] primes = parallelQuery.ToArray();
AsParallel – это метод расширения класса System.Linq.ParallelEnumerable. Он оборачивает входные данные в последовательность, построенную на основе ParallelQuery<TSource>, что приводит к тому, что операторы запросов LINQ будут вызывать альтернативный набор методов расширения, определенные в классе PrallelEnumerable. В этом классе определены параллельные реализации всех стандартных операторов запроса, которые, по сути, разбивают входную последовательность на части, каждая из которых обрабатывается разными потоками, объединяя затем результаты в одну выходную последовательность:
Вызов метода AsSequential() разворачивает последовательность таким образом, чтобы последующие операторы вызывали стандартные операторы запросов и выполнялись последовательно. Это необходимо перед вызовов методов со сторонними эффектами или потоконебезопасных методов.
mySequence.AsParallel() // //Оборачивает последовательность в ParallelQuery<int>
ParallelQuery<int>
.Where (n => n > 100) //Результатом является еще одна последовательность ParallelQuery<int>
.AsParallel() //Нет необходимости – это неэффективно!
.Select (n => n * n)
Не все операторы запросов могут эффективно распараллеливаться. Те операторы, для которых это невозможно, реализуются в PLINQ последовательно . PLINQ может также выполнить оператор последовательно, если он подозревает, что накладные расходы на параллелизм на самом деле замедлят выполнение запроса.
PLINQ применим только к локальным коллекциям: он не работает с LINQ to SQL или с Entity Framework, поскольку в этих случаях LINQ запросы преобразуются в SQL, которые затем выполняются на сервере баз данных. Однако вы можете использовать PLINQ для выполнения дополнительных локальных запросов на результатах, полученных запросами к базе данных.
ПРЕДУПРЕЖДЕНИЕ
Если в процессе выполнения запроса генерируется исключение, то PLINQ перебрасывает AggregateException, в свойстве InnerExceptions которого будет содержаться настоящее исключение (или исключения). Подробнее см. раздел Работа с AggregateException.
ПРИМЕЧАНИЕ. Почему AsParallel не используется по умолчанию?
Поскольку AsParallel прозрачно распараллеливает запросы LINQ, возникает вопрос: «Почему Майкрософт просто не распараллеливает стандартные операторы запросов и не использует PLINQ по умолчанию?»
Существует несколько причин, почему этот подход не используется по умолчанию. Во-первых, польза от использования PLINQ будет только при наличии значительного количества вычислительных задач, распределенных по рабочим потокам. Многие запросы LINQ to Objects выполняются очень быстро, и распараллеливание не только не нужно, но накладные расходы на разделение данных, объединения результатов и координацию дополнительных потоков могут, на самом деле только ухудшить производительность.
Кроме того:
-
результат работы PLINQ запроса (по умолчанию) могут отличаться от результатов выполнения LINQ запроса порядком выходных элементов.
-
PLINQ заворачивает исключения в AggregateException (для обработки нескольких возможных сгенерированных исключений).
-
PLINQ дает ненадежные результаты, если запросы вызывают потоконебезопасные методы.
-
и, в конце концов, PLINQ предоставляет слишком мало возможностей для оптимизации и тонкой настройки. Обременяя стандартные API LINQ to Objects такими нюансами приведет к беде.
Баллистика параллельного выполнения
Как и обычные LINQ запросы, PLINQ запросы выполняются отложено. Это означает, что выполнение начнется только тогда, когда вы начнете использовать результаты запроса – обычно в цикле foreach (также это может произойти в операторе преобразования, таком как ToArray или операторе, возвращающем единственный элемент или значение).
Однако когда вы перебираете результаты запроса, вычисление несколько отличается от того, что происходит при выполнении обычного запроса. Последовательный запрос полностью управляется вызывающим кодом “pull”-образом: каждый элементы входной последовательности извлекается только по требованию вызывающего кода. Параллельный запрос обычно использует независимые потоки для извлечения элементов из входной последовательности несколько опережая потребности вызывающего кода (как телесуфлер для комментатора или «анти-шок» системы в CD-проигрывателях). Затем он обрабатывает элементы параллельно с помощью цепочки запросов (query chain), сохраняя результаты в маленьком буфере, которые доступны оттуда вызывающему коду по требованию. Если вызывающий код приостанавливает или прекращает выполнение до перебора всех элементов результата, обработчик запроса также приостанавливает или прекращает свое выполнение, чтобы не расходовать понапрасну память и процессорное время.
Вы можете изменить поведение буферизации PLINQ путем вызова метода WithMergeOption после вызова метода AsParallel. Значение по умолчанию – AutoBuffered – в целом дает наилучший результат. NoBuffered отключает буферизацию и полезно, когда вы хотите получить результат как можно скорее; FullyBuffered полностью кэширует результаты, прежде чем вернуть его вызывающему коду (операторы Ordery и Reverse работают, естественно, именно так, как и операторы получения элемента, агрегации или преобразования).
PLINQ и сортировка
Побочным эффектом параллельного выполнения операторов запросов является то, что при объединении результатов порядок элементов не обязательно совпадает с исходным, что и показано на предыдущей диаграмме. Другими словами, гарантии соблюдения порядка следования элементов последовательности, которые выполняются операторами LINQ, не выполняются операторами PLINQ.
Если вам нужно сохранить порядок элементов, вам нужно сделать это явно путем вызова метода AsOrdered() после вызова AsParallel():
myCollection.AsParallel().AsOrdered()...
Вызов метода AsOrdered на последовательностях большого размера снижает производительность, поскольку в этом случае необходимо сохранять исходную позицию элементов.
Вы можете нивелировать снижение эффективности вызова AsOrdered путем последующего вызова AsUnordered: это создает определенную точку, после которой запрос может выполняться эффективнее. Так что, если вы хотите сохранить порядок следования элементов входной последовательности только для первых двух операторов, вы можете поступить таким образом:
inputSequence.AsParallel().AsOrdered()
.QueryOperator1()
.QueryOperator2()
.AsUnordered() // С этого места порядок уже не имеет значения
.QueryOperator3()
...
Оператор AsOrdered не применяется по умолчанию, поскольку для большинства запросов не важен исходный порядок входной последовательности. Другими словами, если бы AsOrdered применялся по умолчанию, то для повышения эффективности вам бы пришлось вызывать AsUnordered для большинства своих параллельных запросов, что было бы слишком обременительным.
Ограничения PLINQ
На данный момент существует ряд практических ограничений на то, что PLINQ может распараллеливать. Эти ограничения могут быть ослаблены после выпусков сервис паков или будущих версий .Net Framework.
Следующие операторы запросов предотвращают распараллеливание запросов, если исходные элементы не находятся в своих исходных позициях:
-
Take, TakeWhile, Skip и SkipWhile
-
Индексные версии методов Select, SelectMany и ElementAt
Большинство операторов изменяют индексную позицию элементов (включая операторы, удаляющие элементы, такие как Where). Это значит, что если вы хотите использовать эти операторы, то они должны идти вначале запроса.
Следующие операторы являются распараллеливаемыми, но они используют дорогую стратегию секционирования (partitioning stategy), что иногда может привести к менее эффективному результату, нежели последовательная обработка.
-
Join, GroupBy, GroupJoin, Distinct, Union, Intersect, and Except
-
Join, GroupBy, GroupJoin, Distinct, Union, Intersect и Except
Версия оператора Aggregate, принимающая начальное значение в своем стандартном виде не может быть распараллелена, для этого PLINQ предоставляет специальную перегрузку.
Все остальные операторы являются распараллеливаемыми, хотя их применение и не гарантирует то, что ваш запрос будет распараллелен. PLINQ может выполнить ваш запрос последовательно если определит, что накладные расходы на распараллеливание приведут к замедлению выполнения этого запроса. Вы можете изменить это поведение и принудительно заставить запрос выполняться параллельно путем вызова следующего метода после AsParallel():
.WithExecutionMode (ParallelExecutionMode.ForceParallelism)
Пример: параллельная проверка правописания
Предположим мы хотим реализовать проверку правописания, которая будет быстро выполняться с очень большими документами и будет использовать все доступные ядра процессора. Выражая свой алгоритм с помощью LINQ запроса, мы можем очень легко распараллелить эту задачу.
Для начала нужно загрузить словарь английских слов в HashSet для эффективного поиска:
if (!File.Exists ("WordLookup.txt")) //Содержит порядка 150,000 слов
new WebClient().DownloadFile (
"http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");
var wordLookup = new HashSet<string> (
File.ReadAllLines ("WordLookup.txt"),
StringComparer.InvariantCultureIgnoreCase);
Затем мы воспользуемся этим контейнером для создания тестового «документа», состоящего из миллиона случайных слов. После создания этого документа мы добавим в него несколько ошибок:
var random = new Random();
string[] wordList = wordLookup.ToArray();
string[] wordsToTest = Enumerable.Range (0, 1000000)
.Select (i => wordList [random.Next (0, wordList.Length)])
.ToArray();
wordsToTest [12345] = "woozsh"; //Добавляю несколько
wordsToTest [23456] = "wubsie"; //ошибок.
Теперь мы можем выполнить параллельную проверку правописания путем тестирования wordsToTest с помощью wordLookup. PLINQ делает это с легкостью:
var query = wordsToTest
.AsParallel()
.Select ((word, index) => new IndexedWord { Word=word, Index=index })
.Where (iword => !wordLookup.Contains (iword.Word))
.OrderBy (iword => iword.Index);
query.Dump(); //Отобразить результаты в LINQPad
Вот вывод, полученный с помощью LINQPad:
OrderedParallelQuery<IndexedWord> (2 элемента) |
Word | Index |
woozsh | 12345 |
wubsie | 23456 |
IndexedWord – это структура, определенная следующим образом:
struct IndexedWord { public string Word; public int Index; }
Метод wordLookup.Contains, используемый в предикате, делает разумным распараллеливание этого запроса.
Мы можем немного упростить этот запрос путем использования анонимного типа вместо структуры IndexedWord. Однако, это негативно скажется на производительности, поскольку анонимные типы (будучи классами и, таким образом, ссылочными типами) влекут за собой выделение памяти в управляемой куче и последующую сборку мусора.
Эта разница может быть не заметна при выполнении запросов последовательно, но при параллельном выполнении запросов, использование стековых переменных может быть более эффективным. Это связано с тем, что использование стековых переменных легко распараллеливается (поскольку у каждого потока есть свой стек), в то время как все потоки должны состязаться за использование одной кучи, управляемой одним менеджером памяти и сборщиком мусора.
Использование ThreadLocal<T>
Давайте расширим наш пример и распараллелим создание нашего тестового списка. Мы создаем этот список с помощью LINQ запроса, значит это должно быть просто. Вот последовательная версия:
string[] wordsToTest = Enumerable.Range (0, 1000000)
.Select (i => wordList [random.Next (0, wordList.Length)])
.ToArray();
К сожалению, вызов метода random.Next не является потокобезопасным, так что наше решение не сводится к простому вызову AsParallel существующего запроса. Возможным решением является создание функции, которая будет использовать блокировку вокруг вызова random.Next; однако, это ограничит параллелизм. Более подходящим решением является использование ThreadLocal<Random> для создания отдельного объекта Random для каждого потока. Тогда мы сможем распараллелить запрос следующим образом:
var localRandom = new ThreadLocal<Random>
( () => new Random (Guid.NewGuid().GetHashCode()) );
string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel()
.Select (i => wordList [localRandom.Value.Next (0, wordList.Length)])
.ToArray();
Наша фабричная функция по созданию объекта Random передает хэш-код Guid-а для гарантии того, что два объекта Random, созданные за короткий промежуток времени, будут генерировать разные случайные последовательности.
ПРИМЕЧАНИЕ. Когда использовать PLINQ
Весьма соблазнительно, найти LINQ запросы в своих существующих приложениях и поэкспериментировать с их распараллеливанием. В большинстве случаев от этого не будет никакого толку, поскольку большинство задач, для которых LINQ оказывается налучшим решением, выполняются очень быстро и, таким образом, не получат никакой выгоды от распараллеливания. Лучше найти узкое место в блоке с большим количеством вычислений и ответить на такой вопрос: «Можно ли это представить в виде LINQ запроса?» (Положительным побочным эффектом такого преобразования является то, что обычно LINQ запросы занимают меньше места и более читабельны).
PLINQ хорошо подходит для задач чрезвычайного параллелизма (embarrassingly parallel problems). Он также хорошо походит для структурных блоков, таких как вызов нескольких сервисов одновременно (см. раздел Вызов блокирующих функций или функций с интенсивным ввода/вывода).
PLINQ является плохим выбором для работы с фотографиями, поскольку узким местом будет объединение миллионов точек в одну выходную последовательность. Вместо этого, лучше сохранить точки напрямую в массив или блок неуправляемой памяти и использовать класс Parallel или параллелизм задач для работы с многопоточностью. (Хотя возможно решить проблему объединения результатов путем вызова метода ForAll. Это может иметь смысл, если алгоритм обработки изображения сам привел к использованию LINQ).
Функциональная чистота
Поскольку PLINQ выполняет ваш запрос в параллельных потоках, вы должны заботиться о том, чтобы не выполнять никаких потоконебезопасных операций. В частности, изменение переменных является побочным эффектом, а потому не является потокобезопасным:
// Этот запрос умножает каждый элемент на значение его позиции.
// Если передать на вход Enumerable.Range(0, 999), то результат должен быть возведен в квадрат.
int i = 0;
var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++;
Мы можем сделать увеличение i на единицу потокобезопасной операцией путем использования блокировок или Interlocked операций, но проблема все еще будет в том, что i не обязательно будет соответствовать позиции элемента во входной последовательности. И вызов AsOrdered не исправит проблему, поскольку AsOrdered гарантирует только то, что элементы выходной последовательности будут располагаться аналогично, если бы входная последовательность обрабатывалась последовательно, но это не означает, что входная последовательность будет на самом деле обработана последовательно.
Вместо этого, этот запрос нужно переписать с использованием функции Select, принимающей индекс в качестве параметра:
var query = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i);
Ради повышения производительности, любой метод, вызываемый из операторов запроса должны быть потокобезопасными в том смысле, что он не должен изменять поля или свойства (отсутствие сторонних эффектов, или функциональная чистота). Если эти методы потокобезопасны в том смысле, что они используют блокировки, потенциал параллелизма этого запроса будет ограничен, и это время будет пропорционально продолжительности захвата блокировки, деленное на суммарное время, проведенное внутри этой функции.
Вызов блокирующих функций или функций с интенсивным вводом/выводом
Иногда запросы выполняются длительное время не из-за интенсивных операций центрального процессора, а потому что процессор ждет чего-то, например, загрузки web-страниц или ответа от оборудования. PLINQ может эффективно распараллеливать такие запросы, если вы скажете ему об этом путем вызова метода WithDegreeOfParallelism после вызова метода AsParallel. Например, предположим мы хотим проверить существование шести веб-сайтов одновременно. Вместо того чтобы использовать асинхронные делегаты или вручную запускать шесть потоков, мы можем сделать это элементарно с помощью PLINQ запроса:
from site in new[]
{
"www.albahari.com",
"www.linqpad.net",
"www.oreilly.com",
"www.google.com",
"www.takeonit.com",
"stackoverflow.com"
}
.AsParallel().WithDegreeOfParallelism(6)
let p = new Ping().Send (site)
select new
{
site,
Result = p.Status,
Time = p.RoundtripTime
}
Вызов метода WithDegreeOfParallelism заставляет PLINQ запускать указанное количество задач одновременно. Это необходимо, когда вы вызываете блокирующие функции, такие как Ping.Send, поскольку в противном случае PLINQ предполагает, что запрос выполняет задачи с высокой нагрузкой на центральный процессор и создает задачи соответствующим образом. Например, на двуядерном компьютере PLINQ по умолчанию может запускать только две задачи одновременно, что в данном случае является явно не желательным.
PLINQ обычно обрабатывает задачу в потоке, полученному из пула потоков. Вы можете увеличить минимальное количество потоков путем вызова функции ThreadPool.SetMinThreads.
В качестве другого примера давайте предположим, что мы пишем систему видеонаблюдения и хотим постоянно объединять изображение с четырех камер наблюдения в одно составное изображения для отображения его на мониторе. Мы можем представить камеру в виде следующего класса:
class Camera
{
public readonly int CameraID;
public Camera (int cameraID) { CameraID = cameraID; }
// Получить изображение с камеры: возвращает простую строку, а не изображение
public string GetNextFrame()
{
Thread.Sleep (123); // Симулируем время для получения снимка
return "Frame from camera " + CameraID;
}
}
Для получения составного изображения мы должны вызвать метода GetNextFrame у каждого из четырех объектов камер. Предполагая, что это потребует больше количество операций ввода/вывода, мы можем вчетверо увеличить частоту кадров с помощью параллелизма – даже на одноядерной машине. PLINQ позволяет добиться этого с минимальными усилиями со стороны разработчика:
Camera[] cameras = Enumerable.Range (0, 4) // Создаем 4 объекта камеры
.Select (i => new Camera (i))
.ToArray();
while (true)
{
string[] data = cameras
.AsParallel().AsOrdered().WithDegreeOfParallelism (4)
.Select (c => c.GetNextFrame()).ToArray();
Console.WriteLine (string.Join (", ", data)); // Отображаем данные...
}
Метод GetNextFrame блокирует ход выполнения программы, поэтому мы вызываем WithDegreeOfparallelism для получения необходимого уровня параллелизма. В нашем примере, блокирование происходит при вызове метода Sleep; в реальной жизни блокирование будет происходить из-за того, что получение изображения с камеры требует скорее интенсивного ввода/вывода, чем интенсивной загрузки процессора.
Вызов метода AsOrdered гарантирует, что изображения будут отображены в нужной последовательности. Поскольку в последовательности всего четыре элемента, это не окажет никакого влияния на производительность.
Изменение степени параллелизма
Вы можете вызвать метод WithDegreeOfParallelism только одни раз в PLINQ запросе. Если вам нужно вызвать этот метод снова, вы должны принудительно объединить результаты, а затем снова распараллелить запрос путем вызова метода AsParallel:
"The Quick Brown Fox"
.AsParallel().WithDegreeOfParallelism (2)
.Where (c => !char.IsWhiteSpace (c))
.AsParallel().WithDegreeOfParallelism (3) // Принудительно объединяем результаты и распараллеливаем заново
.Select (c => char.ToUpper (c))
Отмена операций
Отменить выполнение PLINQ запроса, результаты которого обрабатываются внутри цикла foreach, очень просто: прервите выполнение цикла foreach и запрос будет отменен автоматически во время неявного вызова метода Dispose енумератора.
Запрос, который использует операторы преобразования, получения элементов или агрегации, может быть отменен из другого потока с помощью маркера отмены (cancellation token). Для передачи маркера отмены, вызовите метод WithCancellation после вызова AsParallal, и передайте в свойстве Token объект типа CancellationTokenSource. Затем другой поток может вызвать метод Cancel этого маркера, который сгенерирует OperationCanceledException в коде, использующем результаты запроса:
IEnumerable<int> million = Enumerable.Range (3, 1000000);
var cancelSource = new CancellationTokenSource();
var primeNumberQuery =
from n in million.AsParallel().WithCancellation (cancelSource.Token)
where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
select n;
new Thread (() => {
Thread.Sleep (100); // Отменить запрос по истечению
cancelSource.Cancel(); // 100 миллисекунд.
}
).Start();
try
{
// Запуск выполнения запроса:
int[] primes = primeNumberQuery.ToArray();
// Мы никогда сюда не попадем, поскольку другой поток отменит выполнение
}
catch (OperationCanceledException)
{
Console.WriteLine ("Query canceled");
}
PLINQ не прерывает работу потоков, поскольку это опасно. Вместо этого, после отмены, прежде чем завершить выполнение запроса, он ждет окончания выполнения каждого рабочего потока, обрабатывающего текущий элемент. Это значит, что выполнение любых внешних методов, вызванных при выполнении запроса, будет завершено.
Оптимизация PLINQ
Оптимизация со стороны выходной последовательности
Одно из преимуществ использования PLINQ заключается в простоте объединения результатов параллельной обработки выходной последовательности. Однако иногда все, что вам требуется, это выполнить некоторую функцию для каждого элемента:
foreach (int n in parallelQuery)
DoSomething (n);
В этом случае, если вас не заботит порядок обработки элементов, вы можете повысить эффективность выполнения запроса путем использования метода ForAll.
Метод ForAll выполняет делегат для каждого выходного элемента ParallelQuery. Он изменяет внутреннее поведение обработки PLINQ запросов, обходя этапы объединения и перечисления результатов. Вот простой пример:
"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);
Объединение и перечисление результатов не является очень уж дорогой операцией, поэтому использование ForAll дает значительные преимущества только при большом количестве элементов, обработка которых выполняется очень быстро.
Оптимизация со стороны входной последовательности
PLINQ может использовать одну из трех стратегий секционирования (partitioning strategy), для назначения элементов входной последовательности рабочим потокам:
Стратегия | Выделение элементов
|
Относительная производительность
|
Секционирование блоками (Chunk partitioning) | Динамическое | Средняя |
Секционирование по диапазону (Range partitioning) | Статическое | От плохой до отличной
|
Хеш-секционирование (Hash partitioning) | Статическое | Плохая |
Для операторов запросов, которые требуют сравнение элементов (GroupBy, Join, GroupJoin, Intersect, Except, Union, and Distinct), у вас нет выбора: PLINQ всегда использует хеш-секционирование (hash partitioning). Хеш-секционирование относительно менее эффективно, поскольку требует предварительного расчета хеш-значения каждого элемента последовательности (таким образом, элементы с одинаковым хеш-значением могут быть обработаны одним и тем же потоком). Если этот вариант вам покажется слишком медленным, у вас будет единственная возможность – это вызвать метод AsSequential для отключения распараллеливания.
Для всех остальных операторов запроса у вас есть выбор, использовать секционирование по диапазону или блоками. По умолчанию:
-
Если входная последовательность является индексируемой (если это массив или коллекция реализует интерфейс IList<T>), PLINQ будет использовать секционирование по диапазону.
-
В противном случае PLINQ будет использовать секционирование блоками.
Если кратко, то секционирование по диапазону быстрее на больших последовательностях, для которых обработка каждого элемента требует одинакового количества процессорного времени. В противном случае, секционирование блоками более эффективно.
Для принудительного использования секционирования по диапазону:
-
Если запрос начинается с вызова метода Enumerable.Range, замените его на ParallelEnumerable.Range
-
В противном случае, просто вызовите метод ToList или ToArray на входной последовательности (но вы должны понимать, что это, естественно, негативно скажется на производительности).
ParallelEnumerable.Range – это не просто краткая запись для Enumerable.Range(...).AsParallel(). Вызов этого метода влияет на производительность, поскольку в этом случае будет использоваться секционирование по диапазону.
Для принудительного использования секционирования блоками – оберните входную последовательность путем вызова метода Partitioner.Create (расположенного в пространстве имен System.Collection.Concurrent) следующим образом:
int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };
var parallelQuery =
Partitioner.Create (numbers, true).AsParallel()
.Where (...)
Второй аргумент метода Partitioner.Create указывает на то, что вы хотите выровнять загрузку (load-balance) запроса, другими словами, вы хотите использовать секционирование блоками.
Секционирование блоками работает следующим образом: рабочий поток периодически берет небольшой «блок» элементов из входной последовательности для обработки. PLINQ начинает с выделения блоков очень маленького размера (состоящих из одного-двух элементов), затем увеличивает размер блока по мере обработки запроса: это гарантирует то, что короткие последовательности распараллеливаются эффективным образом, а длинные последовательности не страдают от чрезмерных накладных расходов. Если рабочему потоку попались «простые» в обработке элементы (которые он быстро обработал), это приведет к получению дополнительных блоков. Это позволяет системе поддерживать потоки равномерно нагруженными (и балансировать нагрузку ядер процессора); единственным недостатком этого метода является использование синхронизации при получении элементов из разделяемой входной последовательности (обычно, с помощью эксклюзивной блокировки), что может привести к некоторым накладным расходам и конкуренции за общий ресурс.
Секционирование по диапазону обходит обычный процесс перебора входной последовательности и заранее выделяет одинаковое количество элементов для каждого рабочего потока, избегая конкуренции за входную последовательность. Но если одному из потоков достанутся более «простые» элементы и он закончит свое выполнение раньше, он будет бездействовать, пока остальные потоки продолжают свою работу. Алгоритм расчета простых чисел, рассмотренный ранее, может показать плохую производительность при использовании секционирования на основе диапазона. Примером, когда этот тип секционирования может работать эффективно, является вычисление суммы квадратных корней первых десяти миллионов целых чисел:
ParallelEnumerable.Range (1, 10000000).Sum (i => Math.Sqrt (i))
ParallelEnumerable.Range возвращает ParallelQuery<T>, поэтому последующий вызов AsParallel не нужен.
Секционирование на основе диапазона не обязательно выделяет диапазоны непрерывными блоками, вместо этого оно может использовать стратегию на основе фрагментов (“striping” strategy). Например, при наличии двух рабочих потоков, один из них может обрабатывать четные элементы, в то время как другой поток будет обрабатывать нечетные. Оператор TakeWhile практически всегда прибегает к использованию этой стратегии, поскольку он будет стараться избежать обработки ненужных элементов в конце последовательности.
Параллельное программирование в .NET Framework 4.0
ОтветитьУдалитьhttp://www.enterra.ru/blog/parallel-programming-in-net-framework-4-0/