myesn

myEsn2E9

hi
github

C#: .NET 並行程式設計(TPL)

介紹#

本文旨在深入了解多線程、異步、任務、和並行計算,它們統稱為並行編程(Task Parallel Library)

多線程和異步#

多線程和異步是兩個不同的概念,如果分不清就容易寫出以下錯誤代碼:

void button1_Click()
{
    new Thread(() =>
    {
        var client = new WebClient();
        var content = client.DownloadString("https://myesn.cn");
        Console.WriteLine(content);
    }).Start();
}

以上代碼在按鈕點擊時,創建一個線程去下載網頁內容,其目的是為了避免阻塞 UI 線程。

但這是一種低效的實現,要理解這一點,需要從計算機組成原理說起,在電腦主機的硬體裡,有很多硬體子系統具備 “IO 操作的 DMA(Direct Memory Access)模式”,即直接內存訪問在 .NET 中 CLR 所提供的異步編程模型就是讓我們充分利用硬體 DMA 功能來降低對 CPU 的壓力

上述代碼的圖形示意如下:
create a thread to download the web content

低效的原因是該 Thread 在執行內部代碼時會一直佔用 CPU 資源,直到其執行完畢。如果改用異步的方式實現:

void button1_Click()
{
    var client = new WebClient();
    client.DownloadStringCompleted += (sender, e) =>
    {
        Console.WriteLine(e.Result);
    };
    client.DownloadStringAsync(new Uri("https://myesn.cn"));
}

改造後的代碼採用了異步模式底層使用線程池管理(CLR Thread Pool)。異步操作啟動時,CLR 將下載網頁操作交給線程池中的線程執行。開始 IO 操作時,異步將工作線程還給線程池,不再佔用 CPU 資源。異步完成後,WebClient 會通知下載完成事件,使 CLR 響應異步操作完成。這樣,異步模式借助線程池大大節省了 CPU 資源。圖形示意如下:
use asynchrony to donwload web content

所以,多線程和異步的執行流程大致如下:
multi-threaded and asynchronous execution flow

兩者的適用場景:CPU 密集型用多線程,I/O 密集型用異步(讀寫和數據傳輸)。

任何與讀寫和數據傳輸有關的,就屬於 I/O 密集型,否則就是 CPU 密集型,也叫計算密集型。

線程同步#

在多線程環境中,線程同步是為了確保對共享資源的安全訪問,防止多個線程同時修改共享資源而導致數據不一致性或其他問題。通常使用鎖定機制來實現線程同步。

在面向對象語言中,數據類型可以分為值類型和引用類型。值類型包括整數、浮點數、構造體等,而引用類型則是指向對象的引用,如類、數組等。

在大多數情況下,我們可以在引用類型上實現線程同步,即通過鎖定共享資源的對象來確保多個線程對該資源安全訪問。這可以使用內置的鎖定機制(例如使用關鍵字 locksynchronized)或其他同步工具來實現。

然而,由於值類型的拷貝行為以及每個線程擁有自己的堆棧,值類型無法直接被鎖定和等待每個線程操作的是自己的值類型變量副本,而不會相互干擾

在 C# 中,使用微軟提供的關鍵字語法糖 locklock 關鍵字實際上是對 Monitor 類的簡化使用。lock 關鍵字會自動調用 Monitor.EnterMonitor.Exit 方法來對一個對象進行鎖定和解鎖操作。

信號同步是一種線程間通信的機制,用於在多個線程之間進行協調和同步操作。它可以確保線程在特定條件滿足時進行等待,以及在滿足條件時通知其他線程繼續執行。而信號同步機制中涉及到的類型都繼承自抽象類 WaitHandle ,它們的關係如下:

singnal synchronization

  • EventWaitHandle 是一個由操作系統內核產生的布爾值,表示阻塞狀態。調用 Set 方法可以將其置為 true (有信號 true ,無信號 false),解除線程阻塞。AutoResetEventManualResetEvent 都是 EventWaitHandle 的子類。
    • AutoResetEvent 在調用 Set 方法後,會自動將阻塞狀態重置為 false,只有一個等待線程會被喚醒
    • ManualResetEvent 在調用 Set 方法後,不會自動重置阻塞狀態,所有等待線程都會被喚醒,直到調用 Reset 方法將阻塞狀態重置為 false
  • Semaphore 維護著一個由系統內核產生的整型變量作為計數器,如果計數器值為 0,則表示等待;如果大於 0,則解除阻塞,並將計數器遞減。初始化時可以限制最多能夠等待的線程數量。
  • Mutex 解決的是跨應用程序域的線程阻塞和解鎖的能力。它也維護著一個由系統內核產生的標誌位,用於同步對共享資源的訪問。只有一個線程能夠獲取到 Mutex 的鎖進行訪問,其他線程需要等待鎖釋放。

使用 AutoResetEvent 舉例如下:

var test = new Test();
test.StartThread();

Console.ReadKey();

test.SendSignal();

Console.ReadKey();

class Test
{
  private AutoResetEvent _autoResetEvent { get; set; } = new AutoResetEvent(false);

  public void StartThread()
  {
    new Thread(() =>
    {
      Console.WriteLine("線程1開啟,等待信號...");
      _autoResetEvent.WaitOne(); //todo:處理一些複雜工作
      Console.WriteLine("線程1繼續工作...");
    }).Start();

    new Thread(() =>
    {
        Console.WriteLine("線程2開啟,等待信號...");
        _autoResetEvent.WaitOne(); //todo:處理一些複雜工作
        Console.WriteLine("線程2繼續工作...");
    }).Start();
  }

  public void SendSignal() => _autoResetEvent.Set();
}

首先創建一個 AutoResetEvent 實例並設置初始值為 false 代表無信號,然後調用 StartThread 函數啟動兩個線程,每個線程內部都等待一個 signal(信號),然後調用 SendSignal 函數其內部使用 Set() 發送一個信號量,這時會發現只有一個等待線程被喚醒,要想所有等待線程都被喚醒,就使用 ManualResetEvent

只要是引用類型,就可以隨便加鎖嗎?#

加鎖是一種線程同步機制,通過鎖住共享資源來確保在多線程訪問時只有一個線程能夠佔用。但並不是所有的對象都可以被用作鎖

選擇鎖對象時,需要注意:

  1. 鎖對象應該是在多個線程中可見的同一對象
  2. 在非靜態方法中,靜態變量不應該作為鎖對象
  3. 值類型不能作為鎖對象,值類型無法直接被鎖定和等待每個線程操作的是自己的值類型變量副本,而不會相互干擾
  4. 避免將字符串作為鎖對象,字符串在內存中作為常量存在,當多個變量被賦值相同的字符串,它們引用的是同一塊內存空間
  5. 降低鎖對象的可見性,字符串是可見範圍最廣的鎖對象,其次為 typeof(class) 的返回結果,因為它是 class 所有實例都指向 typeof 返回的結果

類(class)的靜態方法應當保證線程安全,非靜態方法不需要保證線程安全。

一般來說,鎖對象也不應該是一個公共變量或屬性。 .NET 一些常用集合類型(System.Collections.ICollection 的實現)比如 List 等,都提供了公有屬性 SyncRoot 讓我們可以實現線程安全的的集合操作,但集合操作的大部分應用場景是單線程操作,線程同步本身比較耗時,這個字段公開是為了讓調用者決定它操作時是否需要線程安全,但一般來說在多線程的情況下,還是建議使用線程安全的集合(System.Collections.Concurrent 命名空間下):ConcurrentBag、ConcurrentDictionary 等。

線程的 IsBackground#

在 .NET 中線程可以設置為前台運行(默認)或後台運行,每個線程都有 IsBackground 屬性:

  • 前台線程(false 默認值):當所有的前台線程執行完成後,應用程序會立即退出,通常,我們將需要完成的關鍵任務設置為前台線程,確保它們被完整地執行
  • 後台線程(true):在應用程序主線程結束時也會隨之結束的線程,當只剩下後台線程運行時,應用程序會立即退出,不會等待後台線程完成。通常,將一些非關鍵的、輔助性的任務設置為後台線程可以使得應用程序更快地退出,例如日誌記錄、監控等。

線程並不會立即開始#

大多數操作系統都不是實時操作系統,包括 Windows。線程的執行不是立即發生的,而是由操作系統根據自己的調度算法來決定何時執行哪個線程。每個線程被分配一小段 CPU 時間來執行工作,因此即使有多個線程同時運行,也會感覺它們幾乎在同時執行。系統會在適當的時機根據算法決定下一個時間點去調度哪個線程。

線程不是編程語言自帶的東西,它的調度是一個非常複雜的過程,線程之間的切換需要花一定時間和空間,並且,它不實時。比如:

for (int i = 0; i < 10; i++)
{
	new Thread(() =>
	{
		Console.WriteLine(i);
	}).Start();
}

Output:
Untitled

從輸出的結果可以看到,線程並不是立即啟動的(多個線程打印了相同的 i 值,比如 5),在循環內直接啟動線程,每個線程都會共享相同的變量 i。當某個線程開始執行時,可能會出現其他線程已經修改了 i 的情況,因此多個線程可能會訪問到相同的 i 值。

這是因為線程在不同的 CPU 核心上運行,而 CPU 與內存之間具有 寄存器、L1 Cache、L2 Cache、L3 Cache、內存 多級結構,如果不對內存進行鎖定,那麼在一個 CPU 核心修改了變量的值但是還沒有寫回到內存中,而另一個 CPU 讀取了舊的值時,便會出現髒讀。

如果想按照預期結果執行(每個線程負責接收屬於自己的 i 值),通過將啟動線程的行為封裝到函數中,在每次調用函數時,會將當前的 i 作為參數傳遞給該函數,創建一個新的局部變量 i。這樣每個線程都有自己獨立的局部變量 i,不會被其他線程影響。因此,每個線程可以得到預期的不同 i 值:

for (int i = 0; i < 10; i++)
{
    StartThread(i);
}

void StartThread(int i)
{
    new Thread(() =>
    {
        Console.WriteLine(i);
    }).Start();
}

線程的優先級(ThreadPriority)#

線程在 C# 中具有不同的優先級(ThreadPriority),我們啟動的所有 Thread,包括 ThreadPool 和 Task,默認是 Normal 級別。優先級涉及到操作系統對線程的調度,Windows 系統是一個基於線程優先級的搶佔式調度模式,優先級高的總是能獲取更多的 CPU 時間,並且在已經就緒時(表示線程已經準備好開始執行,並等待操作系統進行線程調度,例如線程已創建並啟動,沒有被阻塞或掛起),總是會優先執行

一般不建議修改線程的優先級,除非是非常關鍵的線程,高優先級的線程應該具備運行時間短,並立刻進入等待狀態的特性,否則會長時間佔用 CPU 資源導致各種問題。

取消運行中的線程(Thread Cancel)#

在一定時間後,取消正在執行的線程,有聯眾:

  1. 線程不能立即啟動,也不能立即停止,無論採用哪種方式通知線程停止,它都會忙完最緊要的事情,而後在它覺得合適時取消線程。比如 Thread.Abort ,如果線程執行的是一段非托管代碼,就不會拋出線程取消異常,只有當代碼回到 CLR 中,才會引發線程取消異常,當然,異常也不是立即引發的。
  2. 取消線程依賴於線程能否響應停止請求。線程應該提供 Canceled 接口,並在工作期間檢測 Canceled 狀態。只有在檢測到 Canceled 為 true 時,線程才會退出

.NET 提供了標準的取消模式:協作式取消(Cooperative Cancellation),就是上面第 2 點提到的機制,比如:

var cts = new CancellationTokenSource();
new Thread(() =>
{
    while (true)
    {
        if (cts.IsCancellationRequested)
        {
            Console.WriteLine("線程被取消");
            break;
        }

        Thread.Sleep(100);
    }
}).Start();

Console.ReadKey();
cts.Cancel();

主線程通過 CancellationTokenSourceCancel 方法通知工作線程退出,工作線程以固定頻率檢測外界是否有 Cancel 信號傳入並在合適的時機退出。工作線程自身起到了主要作用並確保正確停止。

CancellationTokenSourceToken 有一個 Register 方法在 cts.Cancel() 時被觸發:

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("cts canceled"));

Console.ReadKey();
cts.Cancel();

ThreadPool 和 Thread 的取消模式相同。

控制線程數量#

通過任務管理器 > 性能 > CPU 展示的數據,可以算出每個進程平均擁有 10 個線程左右,所以每個程序都不會啟動太多線程:
Untitled 1

在網絡編程中使用多線程為每個 socket 連接開啟一個線程進行監聽請求時,如果用戶數增多,線程數量也會增加。當線程數量達到一定數量時,會導致計算機資源管理不過來。每個線程需要分配一定的內存空間,而 32 位系統的內存限制通常在 2GB - 3GB 左右,當線程數量達到一定數量時,會耗盡全部內存。此外,過多的線程也會導致 CPU 在線程之間切換的開銷過大,損耗大量的 CPU 時間。對於像 Socket 這類 I/O 密集型的應用,更適合使用異步方式處理

創建過多的線程會導致系統資源的過度佔用,會嚴重影響性能甚至導致系統崩潰。並且線程切換開銷過大,即線程很難得到足夠的 CPU 時間,表現就是需要等待相當長的時間才能執行線程內的操作。

在實際開發中,避免創建過多的線程,合理利用線程池或異步方式來處理任務,以提高性能並減少資源消耗,異步和線程池技術能夠高效管理大量線程,實際工作的線程數量較少。

線程池#

線程的空間開銷主要來自:

  1. 線程內核對象(Thread Kernel Object):每個線程都會創建一個這樣的對象,它主要包含線程上下文信息,佔用的內存在 700 字節左右
  2. 線程環境塊(Thread Environment Block):佔用 4KB 內存
  3. 用戶模式棧(User Mode Stack),即線程棧:線程棧用於保存方法的參數、局部變量和返回值。每個線程棧佔用 1MB 的內存。要用完這些內存很簡單,寫一個不能結束的遞歸方法,讓方法參數和返回值不停地消耗內存,很快就會發生 OutOfMemoryException
  4. 內核模式棧(Kernel Mode Stack):當調用操作系統的內核模式函數時,系統會將函數參數從用戶模式棧複製到內核模式棧。會佔用 12KB 內存

線程的時間開銷來自:

  1. 線程創建的時候,系統相繼初始化以上這些(空間開銷)內存空間
  2. 接著 CLR 會調用所有加載 DLL 的 DLLMain 方法,並傳遞連接標誌(線程終止的時候,也會調用 DLL 的 DLLMain 方法,並傳遞分離標誌)
  3. 線程上下文切換:一個系統中會加載很多的進程,而一個進程又包含若干個線程。但是一個 CPU 在任何時候都只能有一個線程在執行。為了讓每個線程看上去都在運行,系統會不斷地切換 “線程上下文”:每個線程大概得到幾十毫秒的執行時間片,然後就會切換到下一個線程了。這個過程大概又分為以下 5 個步驟:
    1. 進入內核模式
    2. 將上下文信息(主要是一些 CPU 寄存器信息)保存到正在執行的線程內核對象上
    3. 系統獲取一個 Spinlock,並確定下一個要執行的線程,然後釋放 Spinlock。如果下一個線程不在同一個進程內,則需要進行虛擬地址交換
    4. 從將被執行的線程內核對象上載入上下文信息
    5. 離開內核模式

線程的創建和銷毀有時間和空間的代價,為了管理線程的使用,微軟提供了線程池技術。線程池會在工作完成後將線程回收到池中,以供其他任務使用,線程的創建和銷毀由 CLR 的算法來決定。在實際項目中,更推薦使用線程池來管理線程,可以使用 ThreadPoolBackgroundWorker 這兩個類來實現,使用簡單方便,比如:

using System.ComponentModel;

// ThreadPool
ThreadPool.QueueUserWorkItem(state => Console.WriteLine("from ThreadPool"));

// BackgroundWorker
var bw = new BackgroundWorker();
bw.DoWork += (object? sender, DoWorkEventArgs e) => Console.WriteLine("from BackgroundWorker");
bw.RunWorkerAsync();

Console.ReadKey();

ThreadPool 和 BackgroundWorker 是兩種不同的線程處理技術,它們在使用場景和特點上有一些區別:

  1. ThreadPool(線程池)線程池是一種管理和復用線程的機制,通過預先創建一組線程,並對其進行調度和管理,以便在需要執行任務時分配可用線程。ThreadPool 適用於並行執行大量短期小任務的情況,可以避免頻繁地創建和銷毀線程的開銷。可以使用 ThreadPool.QueueUserWorkItem 方法或 Task.Run 方法將工作項添加到線程池中
  2. BackgroundWorker(後台工作者)BackgroundWorker 是一個封裝了異步操作的組件它簡化了在 WinForms 和 WPF 應用程序中使用後台線程執行長時間運行任務的過程。BackgroundWorker 提供了進度報告和完成事件,可以輕鬆地在 UI 線程和後台線程之間進行通信。它適用於需要在後台線程執行較長時間運行的任務,並且還需要更新 UI 的情況

任務#

**[Task](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task?view=net-7.0) 是 .NET 4+ 提供的一種用於異步編程的高級抽象 **。它可以表示一個異步操作或一段可調度的代碼,任務可以被分配給線程池中的線程來執行,Task 是超越 ThreadPool 和 BackgroundWorker 的存在,為 ThreadPool 提供了更多的 API 來管理線程,可以使用 Status 屬性以及 IsCanceled、 IsCompleted和 IsFaulted 屬性來確定任務的狀態:

// 創建一個 CancellationTokenSource 用於模擬取消 Task
var cts = new CancellationTokenSource();
Task.Run(() =>
{
    Console.WriteLine("我是異步線程...");
}).ContinueWith(t =>
{
    if (t.IsCanceled)
    {
        Console.WriteLine("線程被取消了");
    }
 
    if (t.IsFaulted)
    {
        Console.WriteLine("發生異常而被取消了");
    }
 
    if (t.IsCompleted)
    {
        Console.WriteLine("線程成功執行完成了");
    }
});

Console.ReadKey();
// 取消 Task
// cts.Cancel();

ContinueWith 天然支持任務完成的通知、數據返回、任務取消、異常等情況的處理。Task 的 Result 屬性可以拿到線程執行完返回的值,同時阻塞線程直到拿到返回的結果。

一般使用 Task.Factory.StartNew 來實例化和啟動 Task,Task.Factory.ContinueWhenAll(等待所有) 和 Task.Factory.ContinueWhenAny(等待其一) 用來操作多個 Task 的執行結果。

如果想讓 Task 變為同步,只需調用 Wait 方法即可。

async / await#

在方法或表達式上使用 async 修飾符,則稱為異步方法,且異步方法的返回類型必須是 TaskTask<T> 或者 ValueTask<T>await 運算符 等待一個異步操作完成,並且可以暫時中斷當前方法的執行,直到異步操作完成後再繼續執行剩下的代碼,它倆幾乎是成對出現。

同步等待和異步等待#

思考以下代碼,最終輸出的結果大約是多少:

using System.Diagnostics;

var sw = Stopwatch.StartNew();
var task = MyMethod();
Thread.Sleep(4000);
var result = task.Result;
Console.WriteLine(sw.ElapsedMilliseconds);

static async Task<string> MyMethod()
{
    await Task.Delay(5000);
    return "aaa";
}

這段代碼中,首先調用了異步方法 MyMethod,開啟了一個線程。在這個線程中,使用 await 異步等待了 5000ms。同時,在主線程中使用同步等待了 4000ms。

在主線程等待了 4000ms 後,又有一個變量在等待異步線程返回一個值。在這之前,主線程和異步線程都在同時進行等待。因此,在等待異步方法返回結果的過程中,還需要等待額外的 1000ms

最後,當 MyMethod 方法返回結果 "aaa" 後,輸出了程序執行的時間為 5000ms。
Untitled 2

在異步線程執行之後,如果沒有與主線程交互,則不會阻塞主線程執行,只有當主線程需要等待異步方法返回結果,而異步線程還沒執行完的情況下,會造成主線程阻塞。

並行計算(Parallel)#

System.Threading.Tasks.Parallel 是一個用於並行編程的靜態類。它提供了一組靜態方法,可以簡化並發執行 Task 的編碼過程,主要提供了 Invoke、For 和 ForEach 三個函數。

最常用的方法是 Parallel.ForParallel.ForEach。這兩個方法可以將迭代操作並行化,以便在多個線程上同時執行:

Action a = () => Console.WriteLine(DateTime.Now.ToString("HH:mm:ss"));
Parallel.Invoke(Enumerable.Range(1, 5).Select(x => a).ToArray());

Parallel.For(0, 5, i =>
{
    // 循環體邏輯
    Console.WriteLine("當前迭代: " + i);
});

List<int> numbers = new List<int> { 1, 2, 3, 4, 5 };
Parallel.ForEach(numbers, number =>
{
    // 遍歷元素的邏輯
    Console.WriteLine("當前數字: " + number);
});

Console.WriteLine("hi");

Output:
Untitled 3

可以看到,每種方式的的執行都是無序的,並且 Parallel 啟動後是同步運行的,也就是會阻塞當前線程

當使用 Task 時,通常會調用 Run 方法開啟異步任務,並可以使用 await 關鍵字來等待任務完成。這樣可以實現異步執行任務,並在需要時等待任務完成後繼續進行其他操作。

而 Parallel 類是為了簡化並行計算而設計的,它提供了一些靜態方法,如 Parallel.For 和 Parallel.ForEach,用於並行執行循環和遍歷操作。Parallel 類會自動將任務分配給多個線程,在多核 CPU 上同時執行,以達到並行計算的效果。

總結起來,Task 用於異步編程和任務協調,而 Parallel 類則用於簡化並行計算,兩者的使用方式和機制有所不同

Parallel 的錯誤使用#

Parallel 的循環操作支持在每個任務啟動時進行初始化操作,結束時進行掃尾操作,並且允許監控任務的狀態。注意,之前關於 "任務" 的說法有誤,應該將其改為 "線程",比如以下代碼:

var list = new List<int>() { 1, 2, 3, 4, 5, 6 };
var sum = 0;

Parallel.For(0, list.Count,
() =>
{
    Console.WriteLine($"localInit i:1, ThreadId:{Environment.CurrentManagedThreadId}");

    return 1;
},
(i, state, total) =>
{
    Console.WriteLine($"body i:{i}, total:{total}, ThreadId:{Environment.CurrentManagedThreadId}");
    total += i;
    return total;
},
i =>
{
    Console.WriteLine($"localFinally i:{i}, ThreadId:{Environment.CurrentManagedThreadId}");
    Interlocked.Add(ref sum, i);
});
Console.WriteLine(sum);

先仔細看下 Parallel.For 的參數說明:

/// <summary>
/// 在並行執行的情況下,執行一個 for 循環。
/// </summary>
/// <typeparam name="TLocal">線程本地數據的類型。</typeparam>
/// <param name="fromInclusive">起始索引(包含)。</param>
/// <param name="toExclusive">終止索引(不包含)。</param>
/// <param name="localInit">一個函數委託,用於為每個線程返回本地數據的初始狀態。</param>
/// <param name="body">每次迭代調用的委託。</param>
/// <param name="localFinally">對每個線程的本地狀態執行的最後操作的委託。</param>
/// <returns>一個 <see cref="System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> 結構,
/// 包含關於循環的哪個部分已完成的信息。</returns>
/// <remarks>
/// <para>
/// 對於迭代範圍 [fromInclusive, toExclusive) 中的每個值,都会调用一次 <paramref name="body"/> 委託。
/// 它接收以下參數:迭代計數(一个 Int32)、可以用于提前退出循环的 <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> 實例,
/// 以及在同一線程上執行的迭代之間可以共享的某些本地狀態。
/// </para>
/// <para>
/// **對於參與循環執行的每個線程,都会调用一次 <paramref name="localInit"/> 委託,並返回每個線程的初始本地狀態。**
/// **這些初始狀態將傳遞給每個線程上的第一個 <paramref name="body"/> 調用。**
/// **然後,每個後續的 body 調用都返回一個可能被修改的狀態值,該狀態值傳遞給下一個 body 調用。**
/// **最後,在每個線程上的最後一個 body 調用返回一個狀態值,並將該值傳遞給 <paramref name="localFinally"/> 委託。**
/// **每個線程的本地狀態都会执行 <paramref name="localFinally"/> 委託一次,以执行最后的操作。**
/// </para>
/// </remarks>
public static ParallelLoopResult For<TLocal>(
            int fromInclusive, int toExclusive,
            Func<TLocal> localInit,
            Func<int, ParallelLoopState, TLocal, TLocal> body,
            Action<TLocal> localFinally)

Parallel.For 采用並發的方式啟動 For 循環,循環體也就是 body 參數交給線程池去處理,最難理解的三個參數含義如下:

  • localInit:每個新線程創建後都會先執行 localInit 進行初始化行為並返回初始狀態,換句話說,localInit 執行了多少次,就代表 Parallel 開啟了多少個線程
  • body:循環體內容,初始狀態(localInit)將傳遞給每個線程上的第一個 body,每個後續的 body 調用都返回一個可能被修改的狀態值,該狀態值傳遞給下一個 body 調用 **。也就是說如果並發循環 6 次只創建了一个線程,這個線程會執行 6 次 body 參數,localInit 也只會執行一次 **,因為只有一個線程,body 的參數 (i, state, total) 分別代表(當前循環值的範圍 [0-list.Count), Parallel 當前狀態,localInit 返回的值)
  • localFinally:每個線程上最後一個 body 執行後,執行一次,也可以說每個線程結束時執行一次,他的參數 i 是最後一個 body 返回的狀態值,換句話說,localFinally 執行了多少次,就代表 Parallel 開啟了多少個線程
// [0-list.Count) 的和
0+1+2+3+4+5=15

每個線程創建時都會執行一次 localInit,每個線程可能執行多個 body,每個線程的最後一個 body 執行後,在線程釋放之前會執行一次 localFinally,根據創建的線程數量,最後的 sum 值會有所不同:
線程數量  结果
1         16
2         17
3         18
4         19
5         20
6         21

換個清晰的例子:

var list = new List<string>() { "aa", "bb", "cc", "dd", "ee", "ff", "gg" };
var str = string.Empty;
Parallel.For(0, list.Count, () => "-", (i, state, total) => total += list[i], s =>
{
    str += s;
    Console.WriteLine("end:" + s);
});
Console.WriteLine(str);

Untitled 4
根據結果可知,總共輸出了 4 次 "end",這意味著 7(list.Count)次並發循環共創建了 4 個線程,每個線程:

  1. 在創建後都會執行一次 localInit
  2. 只有第一個 body 在執行時能夠獲得 localInit 返回的狀態值。隨後的 body 執行時,其 total 值都是基於上一次 body 的返回值
  3. 每個線程的最後一個 body 執行後,在線程釋放之前會執行一次 localFinally,並將最後一個 body 的返回值傳遞給它

並行計算一定比串行快#

由於並行計算需要創建線程,而線程的創建和銷毀都需要時間和空間的開銷,當循環體執行時間非常短時(沒什麼耗時操作),並行的速度會比串行更慢,比如:

using System.Diagnostics;

var sw = Stopwatch.StartNew();
for (int i = 0; i < 2000; i++)
{
    for (int j = 0; j < 10; j++)
    {
        var sum = i + j;
    }
}

Console.WriteLine("串行循環耗時:" + sw.ElapsedMilliseconds);

sw.Restart();
Parallel.For(0, 2000, i =>
{
    for (int j = 0; j < 10; j++)
    {
        var sum = i + j;
    }
});
Console.WriteLine("並行循環耗時:" + sw.ElapsedMilliseconds);

Untitled 5
但如果將循環體執行時間加長的話,並行循環就會優於串行循環

for (int j = 0; j < 100000; j++)

Untitled 6
所以只有當循環體執行時間較長時才考慮使用並行計算。

並行計算加鎖#

由於並行計算是多線程運行,如果需要訪問共享資源,就要加鎖以保證數據的一致性。加鎖適用於需要同步代碼或長時間佔用共享資源的情況。

在對整數變量進行原子操作時,可以使用 Interlocked.Add 方法,這極大的減少了同步的性能損耗。

var list = new List<int>() { 1, 2, 3, 4, 5, 6 };
int sum = 0;
Parallel.For(0, list.Count, () => 1, (i, state, total) =>
{
    total += i;
    return total;
}, i => Interlocked.Add(ref sum, i));
Console.WriteLine(sum);

上面這段代碼中,如果結尾不進行原子操作,最後可能會導致匯編語言在最後 mov 操作時內存地址的對齊問題,而 Interlocked 解決了這樣的問題。同時 .NET 還提供了 [volatile](https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/keywords/volatile) 關鍵字來解決變量的原子性操作問題,但它不適用多線程場景:

var mc = new MyClass();
Parallel.For(0, 100000, i =>
{
    mc.AddCount();
});
Console.WriteLine(mc.Count);

public class MyClass
{
    public volatile int Count;

    public void AddCount()
    {
        Count++;
    }
}

以上輸出一定小於 100000,因為對共享資源訪問時沒有加鎖,導致輸出的結果不符合預期,在多線程訪問的場景下,使用 Interlocked 或 [lock](https://learn.microsoft.com/zh-cn/dotnet/csharp/language-reference/statements/lock) 語句保護訪問共享資源:

// 使用以下兩種方式中任意一種修改上面的代碼

// 方式一
public void AddCount()
{
    Interlocked.Add(ref Count, 1);
    //Count++;
}

// 方式二
Parallel.For(0, 100000, i =>
{
    lock (mc)
    {
        mc.AddCount();
    }
});

但這樣帶來了新的問題,由於同步鎖的存在,會增加系統開銷(CPU 時間和內存),線程切換時間等,也就是說,如果需要加鎖循環體內全部代碼,就完全沒必要使用並行計算了,因為這樣比串行計算還耗時。

PLINQ#

傳統的 LINQ 是單線程串行執行的,而 PLINQ 是 LINQ 模式的並行實現,也就是 Parallel LINQ。PLINQ 的實現幾乎都在 System.Linq.ParallelEnumerable 類中,它的執行模式:PLINQ 介紹 | Microsoft Learn,總結就是 PLINQ 內部會根據分析的結果,選擇並行或串行執行,以獲得最優的查詢速度。比如:

var list = Enumerable.Range(1, 6);
var query = from i in list
            select i;
foreach (var i in query)
{
    Console.WriteLine(i);
}

Console.WriteLine("----------");
var query2 = from i in list.AsParallel()
             select i;
foreach (var i in query2)
{
    Console.WriteLine(i);
}

Untitled 7
可以看到 LINQ 順序輸出,而 PLINQ 無序輸出(並發多線程)。

在實際開發中,並行不一定總是比串行快,需要根據使用場景找到最佳的方式

並行編程的異常處理#

考慮以下代碼是否會拋出異常,:

MyMethod();

static async Task MyMethod()
{
  await Task.CompletedTask;
  throw new Exception();
}

實際上不會拋出異常,因為 MyMethod 是一個異步方法,他在另外一個線程裡執行和發生異常,但由於它沒有與調用線程(主線程)進行交互,所以調用者並不知道他是否異常。

Task 中的異常處理#

如果 Task 可以進行交互,比如調用 Task 的 Wait、WaitAny、WaitAll 等阻塞方法,或者獲取 Task 的 Result 屬性時,可以捕獲到 Task 內發生的異常,異常類型為 AggregateException,該異常是並行編程中最頂層的異常。

如果可以通過阻塞(同步)獲取,就使用 Task 的 Wait* 阻塞方法,或者使用 await Task,阻塞後同步代碼的異常需要通過 try/catch 來捕獲。

如果需要非阻塞式(異步)捕獲 Task 的異常就用 Task 的 ContinueWith 或事件通知(這個太麻煩了)。

Parallel 中的異常處理#

相較於 Task,Parallel 的異常處理要簡單很多,因為 Parallel 是同步運行的,即阻塞主線程,它裡面拋出的異常能直接被主線程捕獲:

using System.Collections.Concurrent;

// 線程安全的隊列
var exs = new ConcurrentQueue<Exception>();
try
{
    Parallel.For(0, 2, i =>
    {
        try
        {
            throw new ArgumentException();
        }
        catch (Exception e)
        {
            exs.Enqueue(e);
            throw new AggregateException(exs);
        }
    });
}
catch (AggregateException e)
{
    foreach (var ex in e.InnerExceptions)
    {
        Console.WriteLine($"異常類型:{ex.GetType()},異常源:{ex.Source},異常信息:{ex.Message}");
    }
}

System.Console.WriteLine(exs.Count);

參考#

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。