myesn

myEsn2E9

hi
github

C#: .NET 并行编程(TPL)

简介#

本文旨在深入了解多线程、异步、任务、和并行计算,它们统称为并行编程(Task Parallel Library)

多线程和异步#

多线程和异步是两个不同的概念,如果分不清就容易写出以下错误代码:

void button1_Click()
{
    new Thread(() =>
    {
        var client = new WebClient();
        var content = client.DownloadString("https://myesn.cn");
        Console.WriteLine(content);
    }).Start();
}

以上代码在按钮点击时,创建一个线程去下载网页内容,其目的是为了避免阻塞 UI 线程。

但这是一种低效的实现,要理解这一点,需要从计算机组成原理说起,在电脑主机的硬件里,有很多硬件子系统具备 “IO 操作的 DMA(Direct Memory Access)模式”,即直接内存访问在 .NET 中 CLR 所提供的异步编程模型就是让我们充分利用硬件 DMA 功能来降低对 CPU 的压力

上述代码的图形示意如下:
create a thread to download the web content

低效的原因是该 Thread 在执行内部代码时会一直占用 CPU 资源,直到其执行完毕。如果改用异步的方式实现:

void button1_Click()
{
    var client = new WebClient();
    client.DownloadStringCompleted += (sender, e) =>
    {
        Console.WriteLine(e.Result);
    };
    client.DownloadStringAsync(new Uri("https://myesn.cn"));
}

改造后的代码采用了异步模式底层使用线程池管理(CLR Thread Pool)。异步操作启动时,CLR 将下载网页操作交给线程池中的线程执行。开始 IO 操作时,异步将工作线程还给线程池,不再占用 CPU 资源。异步完成后,WebClient 会通知下载完成事件,使 CLR 响应异步操作完成。这样,异步模式借助线程池大大节约了 CPU 资源。图形示意如下:
use asynchrony to donwload web content

所以,多线程和异步的执行流程大致如下:
multi-threaded and asynchronous execution flow

两者的适用场景:CPU 密集型用多线程,I/O 密集型用异步(读写和数据传输)。

任何与读写和数据传输有关的,就属于 I/O 密集型,否则就是 CPU 密集型,也叫计算密集型。

线程同步#

在多线程环境中,线程同步是为了确保对共享资源的安全访问,防止多个线程同时修改共享资源而导致数据不一致性或其他问题。通常使用锁定机制来实现线程同步。

在面向对象语言中,数据类型可以分为值类型和引用类型。值类型包括整数、浮点数、结构体等,而引用类型则是指向对象的引用,如类、数组等。

在大多数情况下,我们可以在引用类型上实现线程同步,即通过锁定共享资源的对象来确保多个线程对该资源安全访问。这可以使用内置的锁定机制(例如使用关键字 locksynchronized)或其他同步工具来实现。

然而,由于值类型的拷贝行为以及每个线程拥有自己的堆栈,值类型无法直接被锁定和等待每个线程操作的是自己的值类型变量副本,而不会相互干扰

在 C# 中,使用微软提供的关键字语法糖 locklock 关键字实际上是对 Monitor 类的简化使用。lock 关键字会自动调用 Monitor.EnterMonitor.Exit 方法来对一个对象进行锁定和解锁操作。

信号同步是一种线程间通信的机制,用于在多个线程之间进行协调和同步操作。它可以确保线程在特定条件满足时进行等待,以及在满足条件时通知其他线程继续执行。而信号同步机制中涉及到的类型都继承自抽象类 WaitHandle ,它们的关系如下:

singnal synchronization

  • EventWaitHandle 是一个由操作系统内核产生的布尔值,表示阻塞状态。调用 Set 方法可以将其置为 true (有信号 true ,无信号 false),解除线程阻塞。AutoResetEventManualResetEvent 都是 EventWaitHandle 的子类。
    • AutoResetEvent 在调用 Set 方法后,会自动将阻塞状态重置为 false,只有一个等待线程会被唤醒
    • ManualResetEvent 在调用 Set 方法后,不会自动重置阻塞状态,所有等待线程都会被唤醒,直到调用 Reset 方法将阻塞状态重置为 false
  • Semaphore 维护着一个由系统内核产生的整型变量作为计数器,如果计数器值为 0,则表示等待;如果大于 0,则解除阻塞,并将计数器递减。初始化时可以限制最多能够等待的线程数量。
  • Mutex 解决的是跨应用程序域的线程阻塞和解锁的能力。它也维护着一个由系统内核产生的标志位,用于同步对共享资源的访问。只有一个线程能够获取到 Mutex 的锁进行访问,其他线程需要等待锁释放。

使用 AutoResetEvent 举例如下:

var test = new Test();
test.StartThread();

Console.ReadKey();

test.SendSignal();

Console.ReadKey();

class Test
{
  private AutoResetEvent _autoResetEvent { get; set; } = new AutoResetEvent(false);

  public void StartThread()
  {
    new Thread(() =>
    {
      Console.WriteLine("线程1开启,等待信号...");
      _autoResetEvent.WaitOne(); //todo:处理一些复杂工作
      Console.WriteLine("线程1继续工作...");
    }).Start();

    new Thread(() =>
    {
        Console.WriteLine("线程2开启,等待信号...");
        _autoResetEvent.WaitOne(); //todo:处理一些复杂工作
        Console.WriteLine("线程2继续工作...");
    }).Start();
  }

  public void SendSignal() => _autoResetEvent.Set();
}

首先创建一个 AutoResetEvent 实例并设置初始值为 false 代表无信号,然后调用 StartThread 函数启动两个线程,每个线程内部都等待一个 signal(信号),然后调用 SendSignal 函数其内部使用 Set() 发送一个信号量,这时会发现只有一个等待线程被唤醒,要想所有等待线程都被唤醒,就使用 ManualResetEvent

只要是引用类型,就可以随便加锁吗?#

加锁是一种线程同步机制,通过锁住共享资源来确保在多线程访问时只有一个线程能够占用。但并不是所有的对象都可以被用作锁

选择锁对象时,需要注意:

  1. 锁对象应该是在多个线程中可见的同一对象
  2. 在非静态方法中,静态变量不应该作为锁对象
  3. 值类型不能作为锁对象,值类型无法直接被锁定和等待每个线程操作的是自己的值类型变量副本,而不会相互干扰
  4. 避免将字符串作为锁对象,字符串在内存中作为常量存在,当多个变量被赋值相同的字符串,它们引用的是同一块内存空间
  5. 降低锁对象的可见性,字符串是可见范围最广的锁对象,其次为 typeof(class) 的返回结果,因为它是 class 所有实例都指向 typeof 返回的结果

类(class)的静态方法应当保证线程安全,非静态方法不需要保证线程安全。

一般来说,锁对象也不应该是一个公共变量或属性。 .NET 一些常用集合类型(System.Collections.ICollection 的实现)比如 List 等,都提供了公有属性 SyncRoot 让我们可以实现线程安全的的集合操作,但集合操作的大部分应用场景是单线程操作,线程同步本身比较耗时,这个字段公开是为了让调用者决定它操作时是否需要线程安全,但一般来说在多线程的情况下,还是建议使用线程安全的集合(System.Collections.Concurrent 命名空间下):ConcurrentBag、ConcurrentDictionary 等。

线程的 IsBackground#

在 .NET 中线程可以设置为前台运行(默认)或后台运行,每个线程都有 IsBackground 属性:

  • 前台线程(false 默认值):当所有的前台线程执行完成后,应用程序会立即退出,通常,我们将需要完成的关键任务设置为前台线程,确保它们被完整地执行
  • 后台线程(true):在应用程序主线程结束时也会随之结束的线程,当只剩下后台线程运行时,应用程序会立即退出,不会等待后台线程完成。通常,将一些非关键的、辅助性的任务设置为后台线程可以使得应用程序更快地退出,例如日志记录、监控等。

线程并不会立即开始#

大多数操作系统都不是实时操作系统,包括 Windows。线程的执行不是立即发生的,而是由操作系统根据自己的调度算法来决定何时执行哪个线程。每个线程被分配一小段 CPU 时间来执行工作,因此即使有多个线程同时运行,也会感觉它们几乎在同时执行。系统会在适当的时机根据算法决定下一个时间点去调度哪个线程。

线程不是编程语言自带的东西,它的调度是一个非常复杂的过程,线程之间的切换需要花一定时间和空间,并且,它不实时。比如:

for (int i = 0; i < 10; i++)
{
	new Thread(() =>
	{
		Console.WriteLine(i);
	}).Start();
}

Output:
Untitled

从输出的结果可以看到,线程并不是立即启动的(多个线程打印了相同的 i 值,比如 5),在循环内直接启动线程,每个线程都会共享相同的变量 i。当某个线程开始执行时,可能会出现其他线程已经修改了 i 的情况,因此多个线程可能会访问到相同的 i 值。

这是因为线程在不同的 CPU 核心上运行,而 CPU 与内存之间具有 寄存器、L1 Cache、L2 Cache、L3 Cache、内存 多级结构,如果不对内存进行锁定,那么在一个 CPU 核心修改了变量的值但是还没有写回到内存中,而另一个 CPU 读取了旧的值时,便会出现脏读。

如果想按照预期结果执行(每个线程负责接收属于自己的 i 值),通过将启动线程的行为封装到函数中,在每次调用函数时,会将当前的 i 作为参数传递给该函数,创建一个新的局部变量 i。这样每个线程都有自己独立的局部变量 i,不会被其他线程影响。因此,每个线程可以得到预期的不同 i 值:

for (int i = 0; i < 10; i++)
{
    StartThread(i);
}

void StartThread(int i)
{
    new Thread(() =>
    {
        Console.WriteLine(i);
    }).Start();
}

线程的优先级(ThreadPriority)#

线程在 C# 中具有不同的优先级(ThreadPriority),我们启动的所有 Thread,包括 ThreadPool 和 Task,默认是 Normal 级别。优先级涉及到操作系统对线程的调度,Windows 系统是一个基于线程优先级的抢占式调度模式,优先级高的总是能获取更多的 CPU 时间,并且在已经就绪时(表示线程已经准备好开始执行,并等待操作系统进行线程调度,例如线程已创建并启动,没有被阻塞或挂起),总是会优先执行

一般不建议修改线程的优先级,除非是非常关键的线程,高优先级的线程应该具备运行时间短,并立刻进入等待状态的特性,否则会长时间占用 CPU 资源导致各种问题。

取消运行中的线程(Thread Cancel)#

在一定时间后,取消正在执行的线程,有联众:

  1. 线程不能立即启动,也不能立即停止,无论采用哪种方式通知线程停止,它都会忙完最紧要的事情,而后在它觉得合适时取消线程。比如 Thread.Abort ,如果线程执行的是一段非托管代码,就不会抛出线程取消异常,只有当代码回到 CLR 中,才会引发线程取消异常,当然,异常也不是立即引发的。
  2. 取消线程依赖于线程能否响应停止请求。线程应该提供 Canceled 接口,并在工作期间检测 Canceled 状态。只有在检测到 Canceled 为 true 时,线程才会退出

.NET 提供了标准的取消模式:协作式取消(Cooperative Cancellation),就是上面第 2 点提到的机制,比如:

var cts = new CancellationTokenSource();
new Thread(() =>
{
    while (true)
    {
        if (cts.IsCancellationRequested)
        {
            Console.WriteLine("线程被取消");
            break;
        }

        Thread.Sleep(100);
    }
}).Start();

Console.ReadKey();
cts.Cancel();

主线程通过 CancellationTokenSourceCancel 方法通知工作线程退出,工作线程以固定频率检测外界是否有 Cancel 信号传入并在合适的时机退出。工作线程自身起到了主要作用并确保正确停止。

CancellationTokenSourceToken 有一个 Register 方法在 cts.Cancel() 时被触发:

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("cts canceled"));

Console.ReadKey();
cts.Cancel();

ThreadPool 和 Thread 的取消模式相同。

控制线程数量#

通过任务管理器 > 性能 > CPU 展示的数据,可以算出每个进程平均拥有 10 个线程左右,所以每个程序都不会启动太多线程:
Untitled 1

在网络编程中使用多线程为每个 socket 连接开启一个线程进行监听请求时,如果用户数增多,线程数量也会增加。当线程数量达到一定数量时,会导致计算机资源管理不过来。每个线程需要分配一定的内存空间,而 32 位系统的内存限制通常在 2GB - 3GB 左右,当线程数量达到一定数量时,会耗尽全部内存。此外,过多的线程也会导致 CPU 在线程之间切换的开销过大,损耗大量的 CPU 时间。对于像 Socket 这类 I/O 密集型的应用,更适合使用异步方式处理

创建过多的线程会导致系统资源的过度占用,会严重影响性能甚至导致系统崩溃。并且线程切换开销过大,即线程很难得到足够的 CPU 时间,表现就是需要等待相当长的时间才能执行线程内的操作。

在实际开发中,避免创建过多的线程,合理利用线程池或异步方式来处理任务,以提高性能并减少资源消耗,异步和线程池技术能够高效管理大量线程,实际工作的线程数量较少。

线程池#

线程的空间开销主要来自:

  1. 线程内核对象(Thread Kernel Object):每个线程都会创建一个这样的对象,它主要包含线程上下文信息,占用的内存在 700 字节左右
  2. 线程环境块(Thread Environment Block):占用 4KB 内存
  3. 用户模式栈(User Mode Stack),即线程栈:线程栈用于保存方法的参数、局部变量和返回值。每个线程栈占用 1MB 的内存。要用完这些内存很简单,写一个不能结束的递归方法,让方法参数和返回值不停地消耗内存,很快就会发生 OutOfMemoryException
  4. 内核模式栈(Kernel Mode Stack):当调用操作系统的内核模式函数时,系统会将函数参数从用户模式栈复制到内核模式栈。会占用 12KB 内存

线程的时间开销来自:

  1. 线程创建的时候,系统相继初始化以上这些(空间开销)内存空间
  2. 接着 CLR 会调用所有加载 DLL 的 DLLMain 方法,并传递连接标志(线程终止的时候,也会调用 DLL 的 DLLMain 方法,并传递分离标志)
  3. 线程上下文切换:一个系统中会加载很多的进程,而一个进程又包含若干个线程。但是一个 CPU 在任何时候都只能有一个线程在执行。为了让每个线程看上去都在运行,系统会不断地切换 “线程上下文”:每个线程大概得到几十毫秒的执行时间片,然后就会切换到下一个线程了。这个过程大概又分为以下 5 个步骤:
    1. 进入内核模式
    2. 将上下文信息(主要是一些 CPU 寄存器信息)保存到正在执行的线程内核对象上
    3. 系统获取一个 Spinlock,并确定下一个要执行的线程,然后释放 Spinlock。如果下一个线程不在同一个进程内,则需要进行虚拟地址交换
    4. 从将被执行的线程内核对象上载入上下文信息
    5. 离开内核模式

线程的创建和销毁有时间和空间的代价,为了管理线程的使用,微软提供了线程池技术。线程池会在工作完成后将线程回收到池中,以供其他任务使用,线程的创建和销毁由 CLR 的算法来决定。在实际项目中,更推荐使用线程池来管理线程,可以使用 ThreadPoolBackgroundWorker 这两个类来实现,使用简单方便,比如:

using System.ComponentModel;

// ThreadPool
ThreadPool.QueueUserWorkItem(state => Console.WriteLine("from ThreadPool"));

// BackgroundWorker
var bw = new BackgroundWorker();
bw.DoWork += (object? sender, DoWorkEventArgs e) => Console.WriteLine("from BackgroundWorker");
bw.RunWorkerAsync();

Console.ReadKey();

ThreadPool 和 BackgroundWorker 是两种不同的线程处理技术,它们在使用场景和特点上有一些区别:

  1. ThreadPool(线程池)线程池是一种管理和复用线程的机制,通过预先创建一组线程,并对其进行调度和管理,以便在需要执行任务时分配可用线程。ThreadPool 适用于并行执行大量短期小任务的情况,可以避免频繁地创建和销毁线程的开销。可以使用 ThreadPool.QueueUserWorkItem 方法或 Task.Run 方法将工作项添加到线程池中
  2. BackgroundWorker(后台工作者)BackgroundWorker 是一个封装了异步操作的组件它简化了在 WinForms 和 WPF 应用程序中使用后台线程执行长时间运行任务的过程。BackgroundWorker 提供了进度报告和完成事件,可以轻松地在 UI 线程和后台线程之间进行通信。它适用于需要在后台线程执行较长时间运行的任务,并且还需要更新 UI 的情况

Task#

**[Task](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task?view=net-7.0) 是 .NET 4+ 提供的一种用于异步编程的高级抽象 **。它可以表示一个异步操作或一段可调度的代码,任务可以被分配给线程池中的线程来执行,Task 是超越 ThreadPool 和 BackgroundWorker 的存在,为 ThreadPool 提供了更多的 API 来管理线程,可以使用 Status 属性以及 IsCanceled、 IsCompleted和 IsFaulted 属性来确定任务的状态:

// 创建一个 CancellationTokenSource 用于模拟取消 Task
var cts = new CancellationTokenSource();
Task.Run(() =>
{
    Console.WriteLine("我是异步线程...");
}).ContinueWith(t =>
{
    if (t.IsCanceled)
    {
        Console.WriteLine("线程被取消了");
    }
 
    if (t.IsFaulted)
    {
        Console.WriteLine("发生异常而被取消了");
    }
 
    if (t.IsCompleted)
    {
        Console.WriteLine("线程成功执行完成了");
    }
});

Console.ReadKey();
// 取消 Task
// cts.Cancel();

ContinueWith 天然支持任务完成的通知、数据返回、任务取消、异常等情况的处理。Task 的 Result 属性可以拿到线程执行完返回的值,同时阻塞线程直到拿到返回的结果。

一般使用 Task.Factory.StartNew 来实例化和启动 Task,Task.Factory.ContinueWhenAll(等待所有) 和 Task.Factory.ContinueWhenAny(等待其一) 用来操作多个 Task 的执行结果。

如果想让 Task 变为同步,只需调用 Wait 方法即可。

async / await#

在方法或表达式上使用 async 修饰符,则称为异步方法,且异步方法的返回类型必须是 TaskTask<T> 或者 ValueTask<T>await 运算符 等待一个异步操作完成,并且可以暂时中断当前方法的执行,直到异步操作完成后再继续执行剩下的代码,它俩几乎是成对出现。

同步等待和异步等待#

思考以下代码,最终输出的结果大约是多少:

using System.Diagnostics;

var sw = Stopwatch.StartNew();
var task = MyMethod();
Thread.Sleep(4000);
var result = task.Result;
Console.WriteLine(sw.ElapsedMilliseconds);

static async Task<string> MyMethod()
{
    await Task.Delay(5000);
    return "aaa";
}

这段代码中,首先调用了异步方法 MyMethod,开启了一个线程。在这个线程中,使用 await 异步等待了 5000ms。同时,在主线程中使用同步等待了 4000ms。

在主线程等待了 4000ms 后,又有一个变量在等待异步线程返回一个值。在这之前,主线程和异步线程都在同时进行等待。因此,在等待异步方法返回结果的过程中,还需要等待额外的 1000ms

最后,当 MyMethod 方法返回结果 "aaa" 后,输出了程序执行的时间为 5000ms。
Untitled 2

在异步线程执行之后,如果没有与主线程交互,则不会阻塞主线程执行,只有当主线程需要等待异步方法返回结果,而异步线程还没执行完的情况下,会造成主线程阻塞。

并行计算(Parallel)#

System.Threading.Tasks.Parallel 是一个用于并行编程的静态类。它提供了一组静态方法,可以简化并发执行 Task 的编码过程,主要提供了 Invoke、For 和 ForEach 三个函数。

最常用的方法是 Parallel.ForParallel.ForEach。这两个方法可以将迭代操作并行化,以便在多个线程上同时执行:

Action a = () => Console.WriteLine(DateTime.Now.ToString("HH:mm:ss"));
Parallel.Invoke(Enumerable.Range(1, 5).Select(x => a).ToArray());

Parallel.For(0, 5, i =>
{
    // 循环体逻辑
    Console.WriteLine("Current iteration: " + i);
});

List<int> numbers = new List<int> { 1, 2, 3, 4, 5 };
Parallel.ForEach(numbers, number =>
{
    // 遍历元素的逻辑
    Console.WriteLine("Current number: " + number);
});

Console.WriteLine("hi");

Output:
Untitled 3

可以看到,每种方式的的执行都是无序的,并且 Parallel 启动后是同步运行的,也就是会阻塞当前线程

当使用 Task 时,通常会调用 Run 方法开启异步任务,并可以使用 await 关键字来等待任务完成。这样可以实现异步执行任务,并在需要时等待任务完成后继续进行其他操作。

而 Parallel 类是为了简化并行计算而设计的,它提供了一些静态方法,如 Parallel.For 和 Parallel.ForEach,用于并行执行循环和遍历操作。Parallel 类会自动将任务分配给多个线程,在多核 CPU 上同时执行,以达到并行计算的效果。

总结起来,Task 用于异步编程和任务协调,而 Parallel 类则用于简化并行计算,两者的使用方式和机制有所不同

Parallel 的错误使用#

Parallel 的循环操作支持在每个任务启动时进行初始化操作,结束时进行扫尾操作,并且允许监控任务的状态。注意,之前关于 "任务" 的说法有误,应该将其改为 "线程",比如以下代码:

var list = new List<int>() { 1, 2, 3, 4, 5, 6 };
var sum = 0;

Parallel.For(0, list.Count,
() =>
{
    Console.WriteLine($"localInit i:1, ThreadId:{Environment.CurrentManagedThreadId}");

    return 1;
},
(i, state, total) =>
{
    Console.WriteLine($"body i:{i}, total:{total}, ThreadId:{Environment.CurrentManagedThreadId}");
    total += i;
    return total;
},
i =>
{
    Console.WriteLine($"localFinally i:{i}, ThreadId:{Environment.CurrentManagedThreadId}");
    Interlocked.Add(ref sum, i);
});
Console.WriteLine(sum);

先仔细看下 Parallel.For 的参数说明:

/// <summary>
/// 在并行执行的情况下,执行一个 for 循环。
/// </summary>
/// <typeparam name="TLocal">线程本地数据的类型。</typeparam>
/// <param name="fromInclusive">起始索引(包含)。</param>
/// <param name="toExclusive">终止索引(不包含)。</param>
/// <param name="localInit">一个函数委托,用于为每个线程返回本地数据的初始状态。</param>
/// <param name="body">每次迭代调用的委托。</param>
/// <param name="localFinally">对每个线程的本地状态执行的最后操作的委托。</param>
/// <returns>一个 <see cref="System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> 结构,
/// 包含关于循环的哪个部分已完成的信息。</returns>
/// <remarks>
/// <para>
/// 对于迭代范围 [fromInclusive, toExclusive) 中的每个值,都会调用一次 <paramref name="body"/> 委托。
/// 它接收以下参数:迭代计数(一个 Int32)、可以用于提前退出循环的 <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> 实例,
/// 以及在同一线程上执行的迭代之间可以共享的某些本地状态。
/// </para>
/// <para>
/// **对于参与循环执行的每个线程,都会调用一次 <paramref name="localInit"/> 委托,并返回每个线程的初始本地状态。**
/// **这些初始状态将传递给每个线程上的第一个 <paramref name="body"/> 调用。**
/// **然后,每个后续的 body 调用都返回一个可能被修改的状态值,该状态值传递给下一个 body 调用。**
/// **最后,在每个线程上的最后一个 body 调用返回一个状态值,并将该值传递给 <paramref name="localFinally"/> 委托。**
/// **每个线程的本地状态都会执行 <paramref name="localFinally"/> 委托一次,以执行最后的操作。**
/// </para>
/// </remarks>
public static ParallelLoopResult For<TLocal>(
            int fromInclusive, int toExclusive,
            Func<TLocal> localInit,
            Func<int, ParallelLoopState, TLocal, TLocal> body,
            Action<TLocal> localFinally)

Parallel.For 采用并发的方式启动 For 循环,循环体也就是 body 参数交给线程池去处理,最难理解的三个参数含义如下:

  • localInit:每个新线程创建后都会先执行 localInit 进行初始化行为并返回初始状态,换句话说,localInit 执行了多少次,就代表 Parallel 开启了多少个线程
  • body:循环体内容,初始状态(localInit)将传递给每个线程上的第一个 body,每个后续的 body 调用都返回一个可能被修改的状态值,该状态值传递给下一个 body 调用 **。也就是说如果并发循环 6 次只创建了一个线程,这个线程会执行 6 次 body 参数,localInit 也只会执行一次 **,因为只有一个线程,body 的参数 (i, state, total) 分别代表(当前循环值的范围 [0-list.Count), Parallel 当前状态,localInit 返回的值)
  • localFinally:每个线程上最后一个 body 执行后,执行一次,也可以说每个线程结束时执行一次,他的参数 i 是最后一个 body 返回的状态值,换句话说,localFinally 执行了多少次,就代表 Parallel 开启了多少个线程
// [0-list.Count) 的和
0+1+2+3+4+5=15

每个线程创建时都会执行一次 localInit,每个线程可能执行多个 body,每个线程结束时都会执行一次 localFinally,根据创建的线程数量,最后的 sum 值会有所不同:
线程数量  结果
1         16
2         17
3         18
4         19
5         20
6         21

换个清晰的例子:

var list = new List<string>() { "aa", "bb", "cc", "dd", "ee", "ff", "gg" };
var str = string.Empty;
Parallel.For(0, list.Count, () => "-", (i, state, total) => total += list[i], s =>
{
    str += s;
    Console.WriteLine("end:" + s);
});
Console.WriteLine(str);

Untitled 4
根据结果可知,总共输出了 4 次 "end",这意味着 7(list.Count)次并发循环共创建了 4 个线程,每个线程:

  1. 在创建后都会执行一次 localInit
  2. 只有第一个 body 在执行时能够获得 localInit 返回的状态值。随后的 body 执行时,其 total 值都是基于上一次 body 的返回值
  3. 每个线程的最后一个 body 执行后,在线程释放之前会执行一次 localFinally,并将最后一个 body 的返回值传递给它

并行计算一定比串行快#

由于并行计算需要创建线程,而线程的创建和销毁都需要时间和空间的开销,当循环体执行时间非常短时(没什么耗时操作),并行的速度会比串行更慢,比如:

using System.Diagnostics;

var sw = Stopwatch.StartNew();
for (int i = 0; i < 2000; i++)
{
    for (int j = 0; j < 10; j++)
    {
        var sum = i + j;
    }
}

Console.WriteLine("串行循环耗时:" + sw.ElapsedMilliseconds);

sw.Restart();
Parallel.For(0, 2000, i =>
{
    for (int j = 0; j < 10; j++)
    {
        var sum = i + j;
    }
});
Console.WriteLine("并行循环耗时:" + sw.ElapsedMilliseconds);

Untitled 5
但如果将循环体执行时间加长的话,并行循环就会优于串行循环

for (int j = 0; j < 100000; j++)

Untitled 6
所以只有当循环体执行时间较长时才考虑使用并行计算。

并行计算加锁#

由于并行计算是多线程运行,如果需要访问共享资源,就要加锁以保证数据的一致性。加锁适用于需要同步代码或长时间占用共享资源的情况。

在对整数变量进行原子操作时,可以使用 Interlocked.Add 方法,这极大的减少了同步的性能损耗。

var list = new List<int>() { 1, 2, 3, 4, 5, 6 };
int sum = 0;
Parallel.For(0, list.Count, () => 1, (i, state, total) =>
{
    total += i;
    return total;
}, i => Interlocked.Add(ref sum, i));
Console.WriteLine(sum);

上面这段代码中,如果结尾不进行原子操作,最后可能会导致汇编语言在最后 mov 操作时内存地址的对齐问题,而 Interlocked 解决了这样的问题。同时 .NET 还提供了 [volatile](https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/keywords/volatile) 关键字来解决变量的原子性操作问题,但它不适用多线程场景:

var mc = new MyClass();
Parallel.For(0, 100000, i =>
{
    mc.AddCount();
});
Console.WriteLine(mc.Count);

public class MyClass
{
    public volatile int Count;

    public void AddCount()
    {
        Count++;
    }
}

以上输出一定小于 100000,因为对共享资源访问时没有加锁,导致输出的结果不符合预期,在多线程访问的场景下,使用 Interlocked 或 [lock](https://learn.microsoft.com/zh-cn/dotnet/csharp/language-reference/statements/lock) 语句保护访问共享资源:

// 使用以下两种方式中任意一种修改上面的代码

// 方式一
public void AddCount()
{
    Interlocked.Add(ref Count, 1);
    //Count++;
}

// 方式二
Parallel.For(0, 100000, i =>
{
    lock (mc)
    {
        mc.AddCount();
    }
});

但这样带来了新的问题,由于同步锁的存在,会增加系统开销(CPU 时间和内存),线程切换时间等,也就是说,如果需要加锁循环体内全部代码,就完全没必要使用并行计算了,因为这样比串行计算还耗时。

PLINQ#

传统的 LINQ 是单线程串行执行的,而 PLINQ 是 LINQ 模式的并行实现,也就是 Parallel LINQ。PLINQ 的实现几乎都在 System.Linq.ParallelEnumerable 类中,它的执行模式:PLINQ 介绍 | Microsoft Learn,总结就是 PLINQ 内部会根据分析的结果,选择并行或串行执行,以获得最优的查询速度。比如:

var list = Enumerable.Range(1, 6);
var query = from i in list
            select i;
foreach (var i in query)
{
    Console.WriteLine(i);
}

Console.WriteLine("----------");
var query2 = from i in list.AsParallel()
             select i;
foreach (var i in query2)
{
    Console.WriteLine(i);
}

Untitled 7
可以看到 LINQ 顺序输出,而 PLINQ 无序输出(并发多线程)。

在实际开发中,并行不一定总是比串行快,需要根据使用场景找到最佳的方式

并行编程的异常处理#

考虑以下代码是否会抛出异常,:

MyMethod();

static async Task MyMethod()
{
  await Task.CompletedTask;
  throw new Exception();
}

实际上不会抛出异常,因为 MyMethod 是一个异步方法,他在另外一个线程里执行和发生异常,但由于它没有与调用线程(主线程)进行交互,所以调用者并不知道他是否异常。

Task 中的异常处理#

如果 Task 可以进行交互,比如调用 Task 的 Wait、WaitAny、WaitAll 等阻塞方法,或者获取 Task 的 Result 属性时,可以捕获到 Task 内发生的异常,异常类型为 AggregateException,该异常是并行编程中最顶层的异常。

如果可以通过阻塞(同步)获取,就使用 Task 的 Wait* 阻塞方法,或者使用 await Task,阻塞后同步代码的异常需要通过 try/catch 来捕获。

如果需要非阻塞式(异步)捕获 Task 的异常就用 Task 的 ContinueWith 或事件通知(这个太麻烦了)。

Parallel 中的异常处理#

相较于 Task,Parallel 的异常处理要简单很多,因为 Parallel 是同步运行的,即阻塞主线程,它里面抛出的异常能直接被主线程捕获:

using System.Collections.Concurrent;

// 线程安全的队列
var exs = new ConcurrentQueue<Exception>();
try
{
    Parallel.For(0, 2, i =>
    {
        try
        {
            throw new ArgumentException();
        }
        catch (Exception e)
        {
            exs.Enqueue(e);
            throw new AggregateException(exs);
        }
    });
}
catch (AggregateException e)
{
    foreach (var ex in e.InnerExceptions)
    {
        Console.WriteLine($"异常类型:{ex.GetType()},异常源:{ex.Source},异常信息:{ex.Message}");
    }
}

System.Console.WriteLine(exs.Count);

参考#

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。