|
|
|
|
|
并行性是在具有多個(gè)內(nèi)核的系統(tǒng)上并行執(zhí)行任務(wù)的能力。.NET Framework 4 中引入了對(duì) .NET 中并行編程的支持。.NET 中的并行編程使我們能夠更有效地使用系統(tǒng)資源,并通過更好的編程控制。本文討論了我們?nèi)绾卧?.NET Core 應(yīng)用程序中使用并行性。
要使用本文中提供的代碼示例,你應(yīng)該在系統(tǒng)中安裝 Visual Studio 2019。
在 Visual Studio 中創(chuàng)建 .NET Core 控制臺(tái)應(yīng)用程序項(xiàng)目
首先,讓我們?cè)?Visual Studio 中創(chuàng)建一個(gè) .NET Core 控制臺(tái)應(yīng)用程序項(xiàng)目。假設(shè)你的系統(tǒng)中安裝了 Visual Studio 2019,請(qǐng)按照下面概述的步驟在 Visual Studio 中創(chuàng)建一個(gè)新的 .NET Core 控制臺(tái)應(yīng)用程序項(xiàng)目。
在本文的后續(xù)部分中,我們將使用該項(xiàng)目來(lái)說(shuō)明 .NET Core 中的并行編程。
.NET Core 中的并發(fā)性和并行性
并發(fā)性和并行性是 .NET 和 .NET Core 中的兩個(gè)關(guān)鍵概念。盡管它們看起來(lái)相同,但它們之間存在細(xì)微差別。
考慮必須由應(yīng)用程序執(zhí)行的兩個(gè)任務(wù) T1 和 T2。如果一個(gè)任務(wù)處于執(zhí)行狀態(tài)而另一個(gè)任務(wù)等待輪到它,則這兩個(gè)任務(wù)處于并發(fā)執(zhí)行狀態(tài)。結(jié)果,其中一項(xiàng)任務(wù)先于另一項(xiàng)完成。相比之下,如果兩個(gè)任務(wù)同時(shí)執(zhí)行,則這兩個(gè)任務(wù)是并行執(zhí)行的。要實(shí)現(xiàn)任務(wù)并行,程序必須運(yùn)行在多核 CPU 上。
.NET Core 中的 Parallel.For 和 Parallel.ForEach
Parallel.For
循環(huán)執(zhí)行可以并行運(yùn)行的迭代。你可以監(jiān)視甚至操縱循環(huán)的狀態(tài)。Parallel.For
循環(huán)就像 for
循環(huán),只是它允許迭代在多個(gè)線程中并行運(yùn)行。
Parallel.ForEach
方法將要完成的工作拆分為多個(gè)任務(wù),每個(gè)任務(wù)對(duì)應(yīng)集合中的每一項(xiàng)。Parallel.ForEach
類似于 C# 中的 foreach
循環(huán),除了 foreach
循環(huán)在單個(gè)線程上運(yùn)行并且按順序進(jìn)行處理,而 Parallel.ForEach
循環(huán)在多個(gè)線程上運(yùn)行并且處理以并行方式進(jìn)行。
C# 中的 Parallel.ForEach 與 foreach
考慮下面的方法,它接受一個(gè)整數(shù)作為參數(shù),如果它是質(zhì)數(shù)則返回 true。
static bool IsPrime(int integer)
{
if (integer <= 1) return false;
if (integer == 2) return true;
var limit = Math.Ceiling(Math.Sqrt(integer));
for (int i = 2; i <= limit; ++i)
if (integer % i == 0)
return false;
return true;
}
我們現(xiàn)在將利用 ConcurrentDictionary
來(lái)存儲(chǔ)素?cái)?shù)和托管線程 ID。由于兩個(gè)范圍之間的質(zhì)數(shù)是唯一的,我們可以將它們用作鍵,將托管線程 ID 用作值。
.NET 中的并發(fā)集合包含在 System.Collections.Concurrent
命名空間內(nèi),并提供集合類的無(wú)鎖和線程安全實(shí)現(xiàn)。ConcurrentDictionary
類包含在 System.Collections.Concurrent
命名空間內(nèi),代表一個(gè)線程安全的字典。
以下兩個(gè)方法均使用 IsPrime
方法來(lái)檢查整數(shù)是否為素?cái)?shù),將素?cái)?shù)和托管線程 ID 存儲(chǔ)在 ConcurrentDictionary
的實(shí)例中,然后返回該實(shí)例。第一種方法使用并發(fā),第二種方法使用并行。
private static ConcurrentDictionary<int, int>
GetPrimeNumbersConcurrent(IList<int> numbers)
{
var primes = new ConcurrentDictionary<int, int>();
foreach (var number in numbers)
{
if(IsPrime(number))
{
primes.TryAdd(number,
Thread.CurrentThread.ManagedThreadId);
}
}
return primes;
}
private static ConcurrentDictionary<int, int>
GetPrimeNumbersParallel(IList<int> numbers)
{
var primes = new ConcurrentDictionary<int, int>();
Parallel.ForEach(numbers, number =>
{
if (IsPrime(number))
{
primes.TryAdd(number,
Thread.CurrentThread.ManagedThreadId);
}
});
return primes;
}
C# 中的并發(fā)與并行示例
以下代碼片段說(shuō)明了如何調(diào)用 GetPrimeNumbersConcurrent
方法來(lái)檢索 1 到 100 之間的所有質(zhì)數(shù)以及托管線程 ID。
static void Main(string[] args)
{
var numbers = Enumerable.Range(0, 100).ToList();
var result = GetPrimeNumbersConcurrent(numbers);
foreach(var number in result)
{
Console.WriteLine($"Prime Number:
{string.Format("{0:0000}",number.Key)},
Managed Thread Id: {number.Value}");
}
Console.Read();
}
當(dāng)你執(zhí)行上面的程序時(shí),你應(yīng)該看到如圖 1 所示的輸出:
圖1
如你所見,托管線程 ID 在每種情況下都是相同的,因?yàn)槲覀冊(cè)诖耸纠惺褂昧瞬l(fā)?,F(xiàn)在讓我們看看使用線程并行時(shí)輸出會(huì)是什么樣子。以下代碼片段說(shuō)明了如何使用并行性檢索 1 到 100 之間的素?cái)?shù)。
static void Main(string[] args)
{
var numbers = Enumerable.Range(0, 100).ToList();
var result = GetPrimeNumbersParallel(numbers);
foreach(var number in result)
{
Console.WriteLine($"Prime Number:
{string.Format("{0:0000}",number.Key)},
Managed Thread Id: {number.Value}");
}
Console.Read();
}
當(dāng)你執(zhí)行上面的程序時(shí),輸出應(yīng)該類似于圖 2 所示:
圖2
正如你在這里看到的,因?yàn)槲覀兪褂昧?Parallel.ForEach
,所以創(chuàng)建了多個(gè)線程,因此托管線程 ID 是不同的。
限制C#中的并行度
并行度是一個(gè)無(wú)符號(hào)整數(shù),表示你的查詢?cè)趫?zhí)行時(shí)應(yīng)利用的最大處理器數(shù)。換句話說(shuō),并行度是一個(gè)整數(shù),表示將在同一時(shí)間點(diǎn)執(zhí)行以處理查詢的最大任務(wù)數(shù)。
默認(rèn)情況下,Parallel.For
和 Parallel.ForEach
方法對(duì)衍生任務(wù)的數(shù)量沒有限制。因此,在上面顯示的 GetPrimeNumbersParallel
方法中,程序嘗試使用系統(tǒng)中的所有可用線程。
你可以利用 MaxDegreeOfParallelism
屬性來(lái)限制衍生任務(wù)的數(shù)量(每個(gè) Parallel 類的 ParallelOptions 實(shí)例)。如果 MaxDegreeOfParallelism
設(shè)置為 -1,則并發(fā)運(yùn)行的任務(wù)數(shù)沒有限制。
以下代碼片段顯示了如何設(shè)置 MaxDegreeOfParallelism
以使用最多 75% 的系統(tǒng)資源。
new ParallelOptions
{
MaxDegreeOfParallelism = Convert.ToInt32(Math.Ceiling((Environment.ProcessorCount * 0.75) * 2.0))
};
請(qǐng)注意,在上面的代碼片段中,我們將處理器數(shù)量乘以二,因?yàn)槊總€(gè)處理器包含兩個(gè)內(nèi)核。以下是 GetPrimeNumbersParallel
方法的完整更新代碼,供你參考:
private static ConcurrentDictionary<int, int> GetPrimeNumbersParallel(IList<int> numbers)
{
var primes = new ConcurrentDictionary<int, int>();
Parallel.ForEach(numbers, number =>
{
new ParallelOptions
{
MaxDegreeOfParallelism = Convert.ToInt32(Math.Ceiling((Environment.ProcessorCount * 0.75) * 2.0))
};
if (IsPrime(number))
{
primes.TryAdd(number,
Thread.CurrentThread.ManagedThreadId);
}
});
return primes;
}
確定 C# 中的并行循環(huán)是否完成
請(qǐng)注意,Parallel.For
和 Parallel.ForEach
都返回一個(gè) ParallelLoopResult
實(shí)例,該實(shí)例可用于確定并行循環(huán)是否已完成執(zhí)行。以下代碼片段顯示了如何使用 ParallelLoopResult
。
ParallelLoopResult parallelLoopResult = Parallel.ForEach(numbers, number =>
{
new ParallelOptions
{
MaxDegreeOfParallelism = Convert.ToInt32(Math.Ceiling(
(Environment.ProcessorCount * 0.75) * 2.0))
};
if (IsPrime(number))
{
primes.TryAdd(number, Thread.CurrentThread.ManagedThreadId);
}
});
Console.WriteLine("IsCompleted: {0}", parallelLoopResult.IsCompleted);
要在非泛型集合中使用 Parallel.ForEach
,你應(yīng)該利用 Enumerable.Cast
擴(kuò)展方法將集合轉(zhuǎn)換為泛型集合,如下面的代碼片段所示。
Parallel.ForEach(nonGenericCollection.Cast<object>(),
currentElement =>
{
});
最后一點(diǎn),不要假設(shè) Parallel.For
或 Parallel.ForEach
的迭代將始終并行執(zhí)行。你還應(yīng)該注意線程親和性問題。你可以閱讀有關(guān)任務(wù)并行性的潛在缺陷。
總結(jié)
本文討論了如何在C#中使用Parallel.For
和Parallel.ForEach
的問題。通過本文的學(xué)習(xí),你應(yīng)該了解了C#中Parallel.For
和Parallel.ForEach
的使用方法及注意問題。
相關(guān)文章