介紹#
本文旨在深入了解多線程、異步、任務、和並行計算,它們統稱為並行編程(Task Parallel Library)。
多線程和異步#
多線程和異步是兩個不同的概念,如果分不清就容易寫出以下錯誤代碼:
void button1_Click()
{
new Thread(() =>
{
var client = new WebClient();
var content = client.DownloadString("https://myesn.cn");
Console.WriteLine(content);
}).Start();
}
以上代碼在按鈕點擊時,創建一個線程去下載網頁內容,其目的是為了避免阻塞 UI 線程。
但這是一種低效的實現,要理解這一點,需要從計算機組成原理說起,在電腦主機的硬體裡,有很多硬體子系統具備 “IO 操作的 DMA(Direct Memory Access)模式”,即直接內存訪問。在 .NET 中 CLR 所提供的異步編程模型就是讓我們充分利用硬體 DMA 功能來降低對 CPU 的壓力。
上述代碼的圖形示意如下:
低效的原因是該 Thread 在執行內部代碼時會一直佔用 CPU 資源,直到其執行完畢。如果改用異步的方式實現:
void button1_Click()
{
var client = new WebClient();
client.DownloadStringCompleted += (sender, e) =>
{
Console.WriteLine(e.Result);
};
client.DownloadStringAsync(new Uri("https://myesn.cn"));
}
改造後的代碼採用了異步模式,底層使用線程池管理(CLR Thread Pool)。異步操作啟動時,CLR 將下載網頁操作交給線程池中的線程執行。開始 IO 操作時,異步將工作線程還給線程池,不再佔用 CPU 資源。異步完成後,WebClient 會通知下載完成事件,使 CLR 響應異步操作完成。這樣,異步模式借助線程池大大節省了 CPU 資源。圖形示意如下:
所以,多線程和異步的執行流程大致如下:
兩者的適用場景:CPU 密集型用多線程,I/O 密集型用異步(讀寫和數據傳輸)。
任何與讀寫和數據傳輸有關的,就屬於 I/O 密集型,否則就是 CPU 密集型,也叫計算密集型。
線程同步#
在多線程環境中,線程同步是為了確保對共享資源的安全訪問,防止多個線程同時修改共享資源而導致數據不一致性或其他問題。通常使用鎖定機制來實現線程同步。
在面向對象語言中,數據類型可以分為值類型和引用類型。值類型包括整數、浮點數、構造體等,而引用類型則是指向對象的引用,如類、數組等。
在大多數情況下,我們可以在引用類型上實現線程同步,即通過鎖定共享資源的對象來確保多個線程對該資源安全訪問。這可以使用內置的鎖定機制(例如使用關鍵字 lock
或 synchronized
)或其他同步工具來實現。
然而,由於值類型的拷貝行為以及每個線程擁有自己的堆棧,值類型無法直接被鎖定和等待。每個線程操作的是自己的值類型變量副本,而不會相互干擾。
在 C# 中,使用微軟提供的關鍵字語法糖 lock
,lock
關鍵字實際上是對 Monitor
類的簡化使用。lock
關鍵字會自動調用 Monitor.Enter
和 Monitor.Exit
方法來對一個對象進行鎖定和解鎖操作。
信號同步是一種線程間通信的機制,用於在多個線程之間進行協調和同步操作。它可以確保線程在特定條件滿足時進行等待,以及在滿足條件時通知其他線程繼續執行。而信號同步機制中涉及到的類型都繼承自抽象類 WaitHandle
,它們的關係如下:
EventWaitHandle
是一個由操作系統內核產生的布爾值,表示阻塞狀態。調用Set
方法可以將其置為true
(有信號true
,無信號false
),解除線程阻塞。AutoResetEvent
和ManualResetEvent
都是EventWaitHandle
的子類。AutoResetEvent
在調用Set
方法後,會自動將阻塞狀態重置為false
,只有一個等待線程會被喚醒。ManualResetEvent
在調用Set
方法後,不會自動重置阻塞狀態,所有等待線程都會被喚醒,直到調用Reset
方法將阻塞狀態重置為false
。
Semaphore
維護著一個由系統內核產生的整型變量作為計數器,如果計數器值為 0,則表示等待;如果大於 0,則解除阻塞,並將計數器遞減。初始化時可以限制最多能夠等待的線程數量。Mutex
解決的是跨應用程序域的線程阻塞和解鎖的能力。它也維護著一個由系統內核產生的標誌位,用於同步對共享資源的訪問。只有一個線程能夠獲取到 Mutex 的鎖進行訪問,其他線程需要等待鎖釋放。
使用 AutoResetEvent 舉例如下:
var test = new Test();
test.StartThread();
Console.ReadKey();
test.SendSignal();
Console.ReadKey();
class Test
{
private AutoResetEvent _autoResetEvent { get; set; } = new AutoResetEvent(false);
public void StartThread()
{
new Thread(() =>
{
Console.WriteLine("線程1開啟,等待信號...");
_autoResetEvent.WaitOne(); //todo:處理一些複雜工作
Console.WriteLine("線程1繼續工作...");
}).Start();
new Thread(() =>
{
Console.WriteLine("線程2開啟,等待信號...");
_autoResetEvent.WaitOne(); //todo:處理一些複雜工作
Console.WriteLine("線程2繼續工作...");
}).Start();
}
public void SendSignal() => _autoResetEvent.Set();
}
首先創建一個 AutoResetEvent
實例並設置初始值為 false
代表無信號,然後調用 StartThread 函數啟動兩個線程,每個線程內部都等待一個 signal(信號),然後調用 SendSignal 函數其內部使用 Set()
發送一個信號量,這時會發現只有一個等待線程被喚醒,要想所有等待線程都被喚醒,就使用 ManualResetEvent
。
只要是引用類型,就可以隨便加鎖嗎?#
加鎖是一種線程同步機制,通過鎖住共享資源來確保在多線程訪問時只有一個線程能夠佔用。但並不是所有的對象都可以被用作鎖。
選擇鎖對象時,需要注意:
- 鎖對象應該是在多個線程中可見的同一對象
- 在非靜態方法中,靜態變量不應該作為鎖對象
- 值類型不能作為鎖對象,值類型無法直接被鎖定和等待。每個線程操作的是自己的值類型變量副本,而不會相互干擾。
- 避免將字符串作為鎖對象,字符串在內存中作為常量存在,當多個變量被賦值相同的字符串,它們引用的是同一塊內存空間
- 降低鎖對象的可見性,字符串是可見範圍最廣的鎖對象,其次為
typeof(class)
的返回結果,因為它是 class 所有實例都指向 typeof 返回的結果
類(class)的靜態方法應當保證線程安全,非靜態方法不需要保證線程安全。
一般來說,鎖對象也不應該是一個公共變量或屬性。 .NET 一些常用集合類型(System.Collections.ICollection 的實現)比如 List 等,都提供了公有屬性 SyncRoot
讓我們可以實現線程安全的的集合操作,但集合操作的大部分應用場景是單線程操作,線程同步本身比較耗時,這個字段公開是為了讓調用者決定它操作時是否需要線程安全,但一般來說在多線程的情況下,還是建議使用線程安全的集合(System.Collections.Concurrent 命名空間下):ConcurrentBag、ConcurrentDictionary 等。
線程的 IsBackground#
在 .NET 中線程可以設置為前台運行(默認)或後台運行,每個線程都有 IsBackground
屬性:
- 前台線程(false 默認值):當所有的前台線程執行完成後,應用程序會立即退出,通常,我們將需要完成的關鍵任務設置為前台線程,確保它們被完整地執行。
- 後台線程(true):在應用程序主線程結束時也會隨之結束的線程,當只剩下後台線程運行時,應用程序會立即退出,不會等待後台線程完成。通常,將一些非關鍵的、輔助性的任務設置為後台線程可以使得應用程序更快地退出,例如日誌記錄、監控等。
線程並不會立即開始#
大多數操作系統都不是實時操作系統,包括 Windows。線程的執行不是立即發生的,而是由操作系統根據自己的調度算法來決定何時執行哪個線程。每個線程被分配一小段 CPU 時間來執行工作,因此即使有多個線程同時運行,也會感覺它們幾乎在同時執行。系統會在適當的時機根據算法決定下一個時間點去調度哪個線程。
線程不是編程語言自帶的東西,它的調度是一個非常複雜的過程,線程之間的切換需要花一定時間和空間,並且,它不實時。比如:
for (int i = 0; i < 10; i++)
{
new Thread(() =>
{
Console.WriteLine(i);
}).Start();
}
Output:
從輸出的結果可以看到,線程並不是立即啟動的(多個線程打印了相同的 i
值,比如 5),在循環內直接啟動線程,每個線程都會共享相同的變量 i
。當某個線程開始執行時,可能會出現其他線程已經修改了 i
的情況,因此多個線程可能會訪問到相同的 i
值。
這是因為線程在不同的 CPU 核心上運行,而 CPU 與內存之間具有 寄存器、L1 Cache、L2 Cache、L3 Cache、內存
多級結構,如果不對內存進行鎖定,那麼在一個 CPU 核心修改了變量的值但是還沒有寫回到內存中,而另一個 CPU 讀取了舊的值時,便會出現髒讀。
如果想按照預期結果執行(每個線程負責接收屬於自己的 i 值),通過將啟動線程的行為封裝到函數中,在每次調用函數時,會將當前的 i
作為參數傳遞給該函數,創建一個新的局部變量 i
。這樣每個線程都有自己獨立的局部變量 i
,不會被其他線程影響。因此,每個線程可以得到預期的不同 i
值:
for (int i = 0; i < 10; i++)
{
StartThread(i);
}
void StartThread(int i)
{
new Thread(() =>
{
Console.WriteLine(i);
}).Start();
}
線程的優先級(ThreadPriority)#
線程在 C# 中具有不同的優先級(ThreadPriority),我們啟動的所有 Thread,包括 ThreadPool 和 Task,默認是 Normal 級別。優先級涉及到操作系統對線程的調度,Windows 系統是一個基於線程優先級的搶佔式調度模式,優先級高的總是能獲取更多的 CPU 時間,並且在已經就緒時(表示線程已經準備好開始執行,並等待操作系統進行線程調度,例如線程已創建並啟動,沒有被阻塞或掛起),總是會優先執行。
一般不建議修改線程的優先級,除非是非常關鍵的線程,高優先級的線程應該具備運行時間短,並立刻進入等待狀態的特性,否則會長時間佔用 CPU 資源導致各種問題。
取消運行中的線程(Thread Cancel)#
在一定時間後,取消正在執行的線程,有聯眾:
- 線程不能立即啟動,也不能立即停止,無論採用哪種方式通知線程停止,它都會忙完最緊要的事情,而後在它覺得合適時取消線程。比如
Thread.Abort
,如果線程執行的是一段非托管代碼,就不會拋出線程取消異常,只有當代碼回到 CLR 中,才會引發線程取消異常,當然,異常也不是立即引發的。 - 取消線程依賴於線程能否響應停止請求。線程應該提供 Canceled 接口,並在工作期間檢測 Canceled 狀態。只有在檢測到 Canceled 為 true 時,線程才會退出。
.NET 提供了標準的取消模式:協作式取消(Cooperative Cancellation),就是上面第 2 點提到的機制,比如:
var cts = new CancellationTokenSource();
new Thread(() =>
{
while (true)
{
if (cts.IsCancellationRequested)
{
Console.WriteLine("線程被取消");
break;
}
Thread.Sleep(100);
}
}).Start();
Console.ReadKey();
cts.Cancel();
主線程通過 CancellationTokenSource
的 Cancel
方法通知工作線程退出,工作線程以固定頻率檢測外界是否有 Cancel 信號傳入並在合適的時機退出。工作線程自身起到了主要作用並確保正確停止。
CancellationTokenSource
的 Token
有一個 Register
方法在 cts.Cancel()
時被觸發:
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("cts canceled"));
Console.ReadKey();
cts.Cancel();
ThreadPool 和 Thread 的取消模式相同。
控制線程數量#
通過任務管理器 > 性能 > CPU 展示的數據,可以算出每個進程平均擁有 10 個線程左右,所以每個程序都不會啟動太多線程:
在網絡編程中使用多線程為每個 socket 連接開啟一個線程進行監聽請求時,如果用戶數增多,線程數量也會增加。當線程數量達到一定數量時,會導致計算機資源管理不過來。每個線程需要分配一定的內存空間,而 32 位系統的內存限制通常在 2GB - 3GB 左右,當線程數量達到一定數量時,會耗盡全部內存。此外,過多的線程也會導致 CPU 在線程之間切換的開銷過大,損耗大量的 CPU 時間。對於像 Socket 這類 I/O 密集型的應用,更適合使用異步方式處理。
創建過多的線程會導致系統資源的過度佔用,會嚴重影響性能甚至導致系統崩潰。並且線程切換開銷過大,即線程很難得到足夠的 CPU 時間,表現就是需要等待相當長的時間才能執行線程內的操作。
在實際開發中,避免創建過多的線程,合理利用線程池或異步方式來處理任務,以提高性能並減少資源消耗,異步和線程池技術能夠高效管理大量線程,實際工作的線程數量較少。
線程池#
線程的空間開銷主要來自:
- 線程內核對象(Thread Kernel Object):每個線程都會創建一個這樣的對象,它主要包含線程上下文信息,佔用的內存在 700 字節左右
- 線程環境塊(Thread Environment Block):佔用 4KB 內存
- 用戶模式棧(User Mode Stack),即線程棧:線程棧用於保存方法的參數、局部變量和返回值。每個線程棧佔用 1MB 的內存。要用完這些內存很簡單,寫一個不能結束的遞歸方法,讓方法參數和返回值不停地消耗內存,很快就會發生 OutOfMemoryException
- 內核模式棧(Kernel Mode Stack):當調用操作系統的內核模式函數時,系統會將函數參數從用戶模式棧複製到內核模式棧。會佔用 12KB 內存
線程的時間開銷來自:
- 線程創建的時候,系統相繼初始化以上這些(空間開銷)內存空間
- 接著 CLR 會調用所有加載 DLL 的 DLLMain 方法,並傳遞連接標誌(線程終止的時候,也會調用 DLL 的 DLLMain 方法,並傳遞分離標誌)
- 線程上下文切換:一個系統中會加載很多的進程,而一個進程又包含若干個線程。但是一個 CPU 在任何時候都只能有一個線程在執行。為了讓每個線程看上去都在運行,系統會不斷地切換 “線程上下文”:每個線程大概得到幾十毫秒的執行時間片,然後就會切換到下一個線程了。這個過程大概又分為以下 5 個步驟:
- 進入內核模式
- 將上下文信息(主要是一些 CPU 寄存器信息)保存到正在執行的線程內核對象上
- 系統獲取一個 Spinlock,並確定下一個要執行的線程,然後釋放 Spinlock。如果下一個線程不在同一個進程內,則需要進行虛擬地址交換
- 從將被執行的線程內核對象上載入上下文信息
- 離開內核模式
線程的創建和銷毀有時間和空間的代價,為了管理線程的使用,微軟提供了線程池技術。線程池會在工作完成後將線程回收到池中,以供其他任務使用,線程的創建和銷毀由 CLR 的算法來決定。在實際項目中,更推薦使用線程池來管理線程,可以使用 ThreadPool 和 BackgroundWorker 這兩個類來實現,使用簡單方便,比如:
using System.ComponentModel;
// ThreadPool
ThreadPool.QueueUserWorkItem(state => Console.WriteLine("from ThreadPool"));
// BackgroundWorker
var bw = new BackgroundWorker();
bw.DoWork += (object? sender, DoWorkEventArgs e) => Console.WriteLine("from BackgroundWorker");
bw.RunWorkerAsync();
Console.ReadKey();
ThreadPool 和 BackgroundWorker 是兩種不同的線程處理技術,它們在使用場景和特點上有一些區別:
- ThreadPool(線程池):線程池是一種管理和復用線程的機制,通過預先創建一組線程,並對其進行調度和管理,以便在需要執行任務時分配可用線程。ThreadPool 適用於並行執行大量短期小任務的情況,可以避免頻繁地創建和銷毀線程的開銷。可以使用
ThreadPool.QueueUserWorkItem
方法或Task.Run
方法將工作項添加到線程池中。 - BackgroundWorker(後台工作者):BackgroundWorker 是一個封裝了異步操作的組件,它簡化了在 WinForms 和 WPF 應用程序中使用後台線程執行長時間運行任務的過程。BackgroundWorker 提供了進度報告和完成事件,可以輕鬆地在 UI 線程和後台線程之間進行通信。它適用於需要在後台線程執行較長時間運行的任務,並且還需要更新 UI 的情況。
任務#
**[Task](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task?view=net-7.0)
是 .NET 4+ 提供的一種用於異步編程的高級抽象 **。它可以表示一個異步操作或一段可調度的代碼,任務可以被分配給線程池中的線程來執行,Task 是超越 ThreadPool 和 BackgroundWorker 的存在,為 ThreadPool 提供了更多的 API 來管理線程,可以使用 Status 屬性以及 IsCanceled、 IsCompleted和 IsFaulted 屬性來確定任務的狀態:
// 創建一個 CancellationTokenSource 用於模擬取消 Task
var cts = new CancellationTokenSource();
Task.Run(() =>
{
Console.WriteLine("我是異步線程...");
}).ContinueWith(t =>
{
if (t.IsCanceled)
{
Console.WriteLine("線程被取消了");
}
if (t.IsFaulted)
{
Console.WriteLine("發生異常而被取消了");
}
if (t.IsCompleted)
{
Console.WriteLine("線程成功執行完成了");
}
});
Console.ReadKey();
// 取消 Task
// cts.Cancel();
ContinueWith
天然支持任務完成的通知、數據返回、任務取消、異常等情況的處理。Task 的 Result
屬性可以拿到線程執行完返回的值,同時阻塞線程直到拿到返回的結果。
一般使用 Task.Factory.StartNew
來實例化和啟動 Task,Task.Factory.ContinueWhenAll
(等待所有) 和 Task.Factory.ContinueWhenAny
(等待其一) 用來操作多個 Task 的執行結果。
如果想讓 Task 變為同步,只需調用 Wait
方法即可。
async / await#
在方法或表達式上使用 async 修飾符
,則稱為異步方法,且異步方法的返回類型必須是 Task
、Task<T>
或者 ValueTask<T>
,await 運算符
等待一個異步操作完成,並且可以暫時中斷當前方法的執行,直到異步操作完成後再繼續執行剩下的代碼,它倆幾乎是成對出現。
同步等待和異步等待#
思考以下代碼,最終輸出的結果大約是多少:
using System.Diagnostics;
var sw = Stopwatch.StartNew();
var task = MyMethod();
Thread.Sleep(4000);
var result = task.Result;
Console.WriteLine(sw.ElapsedMilliseconds);
static async Task<string> MyMethod()
{
await Task.Delay(5000);
return "aaa";
}
這段代碼中,首先調用了異步方法 MyMethod
,開啟了一個線程。在這個線程中,使用 await
異步等待了 5000ms。同時,在主線程中使用同步等待了 4000ms。
在主線程等待了 4000ms 後,又有一個變量在等待異步線程返回一個值。在這之前,主線程和異步線程都在同時進行等待。因此,在等待異步方法返回結果的過程中,還需要等待額外的 1000ms。
最後,當 MyMethod
方法返回結果 "aaa" 後,輸出了程序執行的時間為 5000ms。
在異步線程執行之後,如果沒有與主線程交互,則不會阻塞主線程執行,只有當主線程需要等待異步方法返回結果,而異步線程還沒執行完的情況下,會造成主線程阻塞。
並行計算(Parallel)#
System.Threading.Tasks.Parallel
是一個用於並行編程的靜態類。它提供了一組靜態方法,可以簡化並發執行 Task 的編碼過程,主要提供了 Invoke、For 和 ForEach 三個函數。
最常用的方法是 Parallel.For
和 Parallel.ForEach
。這兩個方法可以將迭代操作並行化,以便在多個線程上同時執行:
Action a = () => Console.WriteLine(DateTime.Now.ToString("HH:mm:ss"));
Parallel.Invoke(Enumerable.Range(1, 5).Select(x => a).ToArray());
Parallel.For(0, 5, i =>
{
// 循環體邏輯
Console.WriteLine("當前迭代: " + i);
});
List<int> numbers = new List<int> { 1, 2, 3, 4, 5 };
Parallel.ForEach(numbers, number =>
{
// 遍歷元素的邏輯
Console.WriteLine("當前數字: " + number);
});
Console.WriteLine("hi");
Output:
可以看到,每種方式的的執行都是無序的,並且 Parallel 啟動後是同步運行的,也就是會阻塞當前線程。
當使用 Task 時,通常會調用 Run 方法開啟異步任務,並可以使用 await 關鍵字來等待任務完成。這樣可以實現異步執行任務,並在需要時等待任務完成後繼續進行其他操作。
而 Parallel 類是為了簡化並行計算而設計的,它提供了一些靜態方法,如 Parallel.For 和 Parallel.ForEach,用於並行執行循環和遍歷操作。Parallel 類會自動將任務分配給多個線程,在多核 CPU 上同時執行,以達到並行計算的效果。
總結起來,Task 用於異步編程和任務協調,而 Parallel 類則用於簡化並行計算,兩者的使用方式和機制有所不同。
Parallel 的錯誤使用#
Parallel 的循環操作支持在每個任務啟動時進行初始化操作,結束時進行掃尾操作,並且允許監控任務的狀態。注意,之前關於 "任務" 的說法有誤,應該將其改為 "線程",比如以下代碼:
var list = new List<int>() { 1, 2, 3, 4, 5, 6 };
var sum = 0;
Parallel.For(0, list.Count,
() =>
{
Console.WriteLine($"localInit i:1, ThreadId:{Environment.CurrentManagedThreadId}");
return 1;
},
(i, state, total) =>
{
Console.WriteLine($"body i:{i}, total:{total}, ThreadId:{Environment.CurrentManagedThreadId}");
total += i;
return total;
},
i =>
{
Console.WriteLine($"localFinally i:{i}, ThreadId:{Environment.CurrentManagedThreadId}");
Interlocked.Add(ref sum, i);
});
Console.WriteLine(sum);
先仔細看下 Parallel.For
的參數說明:
/// <summary>
/// 在並行執行的情況下,執行一個 for 循環。
/// </summary>
/// <typeparam name="TLocal">線程本地數據的類型。</typeparam>
/// <param name="fromInclusive">起始索引(包含)。</param>
/// <param name="toExclusive">終止索引(不包含)。</param>
/// <param name="localInit">一個函數委託,用於為每個線程返回本地數據的初始狀態。</param>
/// <param name="body">每次迭代調用的委託。</param>
/// <param name="localFinally">對每個線程的本地狀態執行的最後操作的委託。</param>
/// <returns>一個 <see cref="System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> 結構,
/// 包含關於循環的哪個部分已完成的信息。</returns>
/// <remarks>
/// <para>
/// 對於迭代範圍 [fromInclusive, toExclusive) 中的每個值,都会调用一次 <paramref name="body"/> 委託。
/// 它接收以下參數:迭代計數(一个 Int32)、可以用于提前退出循环的 <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> 實例,
/// 以及在同一線程上執行的迭代之間可以共享的某些本地狀態。
/// </para>
/// <para>
/// **對於參與循環執行的每個線程,都会调用一次 <paramref name="localInit"/> 委託,並返回每個線程的初始本地狀態。**
/// **這些初始狀態將傳遞給每個線程上的第一個 <paramref name="body"/> 調用。**
/// **然後,每個後續的 body 調用都返回一個可能被修改的狀態值,該狀態值傳遞給下一個 body 調用。**
/// **最後,在每個線程上的最後一個 body 調用返回一個狀態值,並將該值傳遞給 <paramref name="localFinally"/> 委託。**
/// **每個線程的本地狀態都会执行 <paramref name="localFinally"/> 委託一次,以执行最后的操作。**
/// </para>
/// </remarks>
public static ParallelLoopResult For<TLocal>(
int fromInclusive, int toExclusive,
Func<TLocal> localInit,
Func<int, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally)
Parallel.For
采用並發的方式啟動 For 循環,循環體也就是 body 參數交給線程池去處理,最難理解的三個參數含義如下:
- localInit:每個新線程創建後都會先執行
localInit
進行初始化行為並返回初始狀態,換句話說,localInit 執行了多少次,就代表 Parallel 開啟了多少個線程 - body:循環體內容,初始狀態(localInit)將傳遞給每個線程上的第一個 body,每個後續的 body 調用都返回一個可能被修改的狀態值,該狀態值傳遞給下一個 body 調用 **。也就是說如果並發循環 6 次只創建了一个線程,這個線程會執行 6 次 body 參數,localInit 也只會執行一次 **,因為只有一個線程,body 的參數 (i, state, total) 分別代表(當前循環值的範圍 [0-list.Count), Parallel 當前狀態,localInit 返回的值)
- localFinally:每個線程上最後一個 body 執行後,執行一次,也可以說每個線程結束時執行一次,他的參數 i 是最後一個 body 返回的狀態值,換句話說,localFinally 執行了多少次,就代表 Parallel 開啟了多少個線程
// [0-list.Count) 的和
0+1+2+3+4+5=15
每個線程創建時都會執行一次 localInit,每個線程可能執行多個 body,每個線程的最後一個 body 執行後,在線程釋放之前會執行一次 localFinally,根據創建的線程數量,最後的 sum 值會有所不同:
線程數量 结果
1 16
2 17
3 18
4 19
5 20
6 21
換個清晰的例子:
var list = new List<string>() { "aa", "bb", "cc", "dd", "ee", "ff", "gg" };
var str = string.Empty;
Parallel.For(0, list.Count, () => "-", (i, state, total) => total += list[i], s =>
{
str += s;
Console.WriteLine("end:" + s);
});
Console.WriteLine(str);
根據結果可知,總共輸出了 4 次 "end",這意味著 7(list.Count)次並發循環共創建了 4 個線程,每個線程:
- 在創建後都會執行一次 localInit
- 只有第一個 body 在執行時能夠獲得 localInit 返回的狀態值。隨後的 body 執行時,其 total 值都是基於上一次 body 的返回值
- 每個線程的最後一個 body 執行後,在線程釋放之前會執行一次 localFinally,並將最後一個 body 的返回值傳遞給它
並行計算一定比串行快#
由於並行計算需要創建線程,而線程的創建和銷毀都需要時間和空間的開銷,當循環體執行時間非常短時(沒什麼耗時操作),並行的速度會比串行更慢,比如:
using System.Diagnostics;
var sw = Stopwatch.StartNew();
for (int i = 0; i < 2000; i++)
{
for (int j = 0; j < 10; j++)
{
var sum = i + j;
}
}
Console.WriteLine("串行循環耗時:" + sw.ElapsedMilliseconds);
sw.Restart();
Parallel.For(0, 2000, i =>
{
for (int j = 0; j < 10; j++)
{
var sum = i + j;
}
});
Console.WriteLine("並行循環耗時:" + sw.ElapsedMilliseconds);
但如果將循環體執行時間加長的話,並行循環就會優於串行循環:
for (int j = 0; j < 100000; j++)
所以只有當循環體執行時間較長時才考慮使用並行計算。
並行計算加鎖#
由於並行計算是多線程運行,如果需要訪問共享資源,就要加鎖以保證數據的一致性。加鎖適用於需要同步代碼或長時間佔用共享資源的情況。
在對整數變量進行原子操作時,可以使用 Interlocked.Add
方法,這極大的減少了同步的性能損耗。
var list = new List<int>() { 1, 2, 3, 4, 5, 6 };
int sum = 0;
Parallel.For(0, list.Count, () => 1, (i, state, total) =>
{
total += i;
return total;
}, i => Interlocked.Add(ref sum, i));
Console.WriteLine(sum);
上面這段代碼中,如果結尾不進行原子操作,最後可能會導致匯編語言在最後 mov 操作時內存地址的對齊問題,而 Interlocked 解決了這樣的問題。同時 .NET 還提供了 [volatile](https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/keywords/volatile)
關鍵字來解決變量的原子性操作問題,但它不適用多線程場景:
var mc = new MyClass();
Parallel.For(0, 100000, i =>
{
mc.AddCount();
});
Console.WriteLine(mc.Count);
public class MyClass
{
public volatile int Count;
public void AddCount()
{
Count++;
}
}
以上輸出一定小於 100000,因為對共享資源訪問時沒有加鎖,導致輸出的結果不符合預期,在多線程訪問的場景下,使用 Interlocked 或 [lock](https://learn.microsoft.com/zh-cn/dotnet/csharp/language-reference/statements/lock)
語句保護訪問共享資源:
// 使用以下兩種方式中任意一種修改上面的代碼
// 方式一
public void AddCount()
{
Interlocked.Add(ref Count, 1);
//Count++;
}
// 方式二
Parallel.For(0, 100000, i =>
{
lock (mc)
{
mc.AddCount();
}
});
但這樣帶來了新的問題,由於同步鎖的存在,會增加系統開銷(CPU 時間和內存),線程切換時間等,也就是說,如果需要加鎖循環體內全部代碼,就完全沒必要使用並行計算了,因為這樣比串行計算還耗時。
PLINQ#
傳統的 LINQ 是單線程串行執行的,而 PLINQ 是 LINQ 模式的並行實現,也就是 Parallel LINQ。PLINQ 的實現幾乎都在 System.Linq.ParallelEnumerable 類中,它的執行模式:PLINQ 介紹 | Microsoft Learn,總結就是 PLINQ 內部會根據分析的結果,選擇並行或串行執行,以獲得最優的查詢速度。比如:
var list = Enumerable.Range(1, 6);
var query = from i in list
select i;
foreach (var i in query)
{
Console.WriteLine(i);
}
Console.WriteLine("----------");
var query2 = from i in list.AsParallel()
select i;
foreach (var i in query2)
{
Console.WriteLine(i);
}
可以看到 LINQ 順序輸出,而 PLINQ 無序輸出(並發多線程)。
在實際開發中,並行不一定總是比串行快,需要根據使用場景找到最佳的方式。
並行編程的異常處理#
考慮以下代碼是否會拋出異常,:
MyMethod();
static async Task MyMethod()
{
await Task.CompletedTask;
throw new Exception();
}
實際上不會拋出異常,因為 MyMethod 是一個異步方法,他在另外一個線程裡執行和發生異常,但由於它沒有與調用線程(主線程)進行交互,所以調用者並不知道他是否異常。
Task 中的異常處理#
如果 Task 可以進行交互,比如調用 Task 的 Wait、WaitAny、WaitAll 等阻塞方法,或者獲取 Task 的 Result 屬性時,可以捕獲到 Task 內發生的異常,異常類型為 AggregateException
,該異常是並行編程中最頂層的異常。
如果可以通過阻塞(同步)獲取,就使用 Task 的 Wait* 阻塞方法,或者使用 await Task,阻塞後同步代碼的異常需要通過 try/catch 來捕獲。
如果需要非阻塞式(異步)捕獲 Task 的異常就用 Task 的 ContinueWith 或事件通知(這個太麻煩了)。
Parallel 中的異常處理#
相較於 Task,Parallel 的異常處理要簡單很多,因為 Parallel 是同步運行的,即阻塞主線程,它裡面拋出的異常能直接被主線程捕獲:
using System.Collections.Concurrent;
// 線程安全的隊列
var exs = new ConcurrentQueue<Exception>();
try
{
Parallel.For(0, 2, i =>
{
try
{
throw new ArgumentException();
}
catch (Exception e)
{
exs.Enqueue(e);
throw new AggregateException(exs);
}
});
}
catch (AggregateException e)
{
foreach (var ex in e.InnerExceptions)
{
Console.WriteLine($"異常類型:{ex.GetType()},異常源:{ex.Source},異常信息:{ex.Message}");
}
}
System.Console.WriteLine(exs.Count);
參考#
- https://masuit.com/1201
- https://threads.whuanle.cn/ 比上面文章更深入和詳細
- https://learn.microsoft.com/zh-cn/dotnet/standard/parallel-programming/
- https://learn.microsoft.com/zh-cn/dotnet/standard/clr
- 多線程(Multithread) - 隨筆分類 - InCerry - 博客園 (cnblogs.com)
- ThreadPool 類 (System.Threading) | Microsoft Learn
- BackgroundWorker 類 (System.ComponentModel) | Microsoft Learn
- Task 類 (System.Threading.Tasks) | Microsoft Learn
- async - C# 參考 | Microsoft Learn
- await 運算符 - 異步等待任務完成 | Microsoft Learn
- C# 中的異步編程 | Microsoft Learn
- 基於任務的異步模式 (TAP):介紹和概述 | Microsoft Learn
- Parallel 類 (System.Threading.Tasks) | Microsoft Learn
- referencesource/mscorlib/system/threading/Tasks/Parallel.cs at master · microsoft/referencesource · GitHub
- volatile - C# 參考 | Microsoft Learn
- Interlocked 類 (System.Threading) | Microsoft Learn