概要#
この記事は、マルチスレッド、非同期、タスク、および並列計算について深く理解することを目的としており、これらは総称して並列プログラミング(Task Parallel Library)と呼ばれます。
マルチスレッドと非同期#
マルチスレッドと非同期は異なる概念です。これを区別できないと、以下のような誤ったコードを書くことになりやすいです:
void button1_Click()
{
new Thread(() =>
{
var client = new WebClient();
var content = client.DownloadString("https://myesn.cn");
Console.WriteLine(content);
}).Start();
}
上記のコードは、ボタンがクリックされたときに、スレッドを作成してウェブページの内容をダウンロードします。これは UI スレッドのブロックを避けるためのものです。
しかし、これは非効率な実装です。これを理解するには、コンピュータの構成原理から始める必要があります。コンピュータのハードウェアには、多くのハードウェアサブシステムが「IO 操作のDMA(Direct Memory Access)モード」、つまり直接メモリアクセスを備えています。.NET のCLRが提供する非同期プログラミングモデルは、ハードウェアの DMA 機能を最大限に活用して CPU への負担を軽減するためのものです。
上記のコードの図示は以下の通りです:
非効率な理由は、スレッドが内部コードを実行している間、CPU リソースを常に占有するためです。実行が完了するまで、CPU リソースを占有し続けます。非同期の方法で実装を変更すると:
void button1_Click()
{
var client = new WebClient();
client.DownloadStringCompleted += (sender, e) =>
{
Console.WriteLine(e.Result);
};
client.DownloadStringAsync(new Uri("https://myesn.cn"));
}
改良されたコードは非同期モードを採用し、底層ではスレッドプール管理(CLR Thread Pool)を使用しています。非同期操作が開始されると、CLR はウェブページのダウンロード操作をスレッドプール内のスレッドに委任します。IO 操作を開始すると、非同期は作業スレッドをスレッドプールに返し、CPU リソースを占有しなくなります。非同期が完了すると、WebClient はダウンロード完了イベントを通知し、CLR は非同期操作の完了に応じます。このように、非同期モードはスレッドプールを利用して CPU リソースを大幅に節約します。図示は以下の通りです:
したがって、マルチスレッドと非同期の実行フローは大体以下のようになります:
両者の適用シーン:CPU 集約型はマルチスレッドを使用し、I/O 集約型は非同期(読み書きとデータ転送)を使用します。
読み書きやデータ転送に関するものはすべて I/O 集約型に属し、それ以外は CPU 集約型、または計算集約型と呼ばれます。
スレッド同期#
マルチスレッド環境において、スレッド同期は共有リソースへの安全なアクセスを確保し、複数のスレッドが同時に共有リソースを変更することによってデータの不整合やその他の問題を防ぐためのものです。通常、ロックメカニズムを使用してスレッド同期を実現します。
オブジェクト指向言語では、データ型は値型と参照型に分けられます。値型には整数、浮動小数点数、構造体などが含まれ、参照型はクラス、配列などのオブジェクトへの参照を指します。
ほとんどの場合、参照型でスレッド同期を実現できます。つまり、共有リソースのオブジェクトをロックすることで、複数のスレッドがそのリソースに安全にアクセスできるようにします。これは、組み込みのロックメカニズム(例えば、lock
キーワードやsynchronized
を使用)やその他の同期ツールを使用して実現できます。
しかし、値型のコピー動作や各スレッドが独自のスタックを持つため、値型は直接ロックや待機ができません。各スレッドは自分の値型変数のコピーを操作しており、相互に干渉しません。
C# では、Microsoft が提供するキーワード構文糖lock
を使用します。lock
キーワードは実際にはMonitor
クラスの簡略化された使用です。lock
キーワードは自動的にMonitor.Enter
とMonitor.Exit
メソッドを呼び出してオブジェクトをロックおよびアンロックします。
シグナル同期は、複数のスレッド間で調整と同期操作を行うためのメカニズムです。特定の条件が満たされたときにスレッドが待機し、条件が満たされたときに他のスレッドに続行を通知することを保証します。シグナル同期メカニズムに関与する型はすべて抽象クラスWaitHandle
から継承されており、その関係は以下の通りです:
EventWaitHandle
は、オペレーティングシステムのカーネルによって生成されるブール値で、ブロック状態を示します。Set
メソッドを呼び出すと、true
(シグナルありtrue
、シグナルなしfalse
)に設定され、スレッドのブロックが解除されます。AutoResetEvent
とManualResetEvent
はどちらもEventWaitHandle
のサブクラスです。AutoResetEvent
はSet
メソッドを呼び出した後、自動的にブロック状態をfalse
にリセットし、待機しているスレッドのうち 1 つだけが起こされます。ManualResetEvent
はSet
メソッドを呼び出した後、自動的にブロック状態をリセットせず、すべての待機スレッドが起こされ、Reset
メソッドを呼び出してブロック状態をfalse
にリセットするまで続きます。
Semaphore
は、システムカーネルによって生成される整数変数をカウンターとして維持します。カウンターの値が 0 の場合は待機を示し、0 より大きい場合はブロックを解除し、カウンターを減少させます。初期化時に待機できるスレッドの最大数を制限できます。Mutex
は、アプリケーションドメインを越えたスレッドのブロックと解除の能力を解決します。これも、共有リソースへのアクセスを同期するためにシステムカーネルによって生成されたフラグを維持します。Mutex のロックを取得できるのは 1 つのスレッドだけで、他のスレッドはロックが解放されるのを待つ必要があります。
AutoResetEvent
を使用した例は以下の通りです:
var test = new Test();
test.StartThread();
Console.ReadKey();
test.SendSignal();
Console.ReadKey();
class Test
{
private AutoResetEvent _autoResetEvent { get; set; } = new AutoResetEvent(false);
public void StartThread()
{
new Thread(() =>
{
Console.WriteLine("スレッド1が開始され、信号を待っています...");
_autoResetEvent.WaitOne(); //todo:いくつかの複雑な作業を処理
Console.WriteLine("スレッド1が作業を続けます...");
}).Start();
new Thread(() =>
{
Console.WriteLine("スレッド2が開始され、信号を待っています...");
_autoResetEvent.WaitOne(); //todo:いくつかの複雑な作業を処理
Console.WriteLine("スレッド2が作業を続けます...");
}).Start();
}
public void SendSignal() => _autoResetEvent.Set();
}
まず、AutoResetEvent
のインスタンスを作成し、初期値をfalse
に設定して信号なしを示します。次に、StartThread 関数を呼び出して 2 つのスレッドを起動し、各スレッドは内部で信号を待機します。その後、SendSignal 関数を呼び出し、その内部でSet()
を使用して信号を送信します。この時、待機しているスレッドのうち 1 つだけが起こされることに気づくでしょう。すべての待機スレッドを起こしたい場合は、ManualResetEvent
を使用します。
参照型であれば、自由にロックできるのか?#
ロックはスレッド同期メカニズムであり、共有リソースをロックすることで、マルチスレッドアクセス時に 1 つのスレッドだけが占有できることを保証します。しかし、すべてのオブジェクトがロックとして使用できるわけではありません。
ロックオブジェクトを選択する際には、次の点に注意が必要です:
- ロックオブジェクトは複数のスレッドで可視の同一オブジェクトであるべきです。
- 非静的メソッドでは、静的変数をロックオブジェクトとして使用すべきではありません。
- 値型はロックオブジェクトとして使用できません。値型は直接ロックや待機ができません。各スレッドは自分の値型変数のコピーを操作しており、相互に干渉しません。
- 文字列をロックオブジェクトとして使用することを避けてください。文字列はメモリ内で定数として存在し、複数の変数が同じ文字列に値を割り当てられると、それらは同じメモリ空間を参照します。
- ロックオブジェクトの可視性を低下させます。文字列は可視範囲が最も広いロックオブジェクトであり、次に
typeof(class)
の戻り値です。これは、クラスのすべてのインスタンスがtypeof
の戻り値を指すためです。
クラス(class)の静的メソッドはスレッドセーフであることを保証する必要がありますが、非静的メソッドはスレッドセーフである必要はありません。
一般的に、ロックオブジェクトは公共の変数やプロパティであってはなりません。 .NET の一般的なコレクションタイプ(System.Collections.ICollection の実装)には、List などがあり、スレッドセーフなコレクション操作を実現するために公有プロパティSyncRoot
を提供していますが、コレクション操作の大部分のアプリケーションシーンは単一スレッド操作です。スレッド同期自体は比較的時間がかかるため、このフィールドが公開されているのは、呼び出し元が操作時にスレッドセーフが必要かどうかを決定できるようにするためですが、一般的にはマルチスレッドの場合、スレッドセーフなコレクション(System.Collections.Concurrent 名前空間内)を使用することをお勧めします:ConcurrentBag、ConcurrentDictionary など。
スレッドの IsBackground#
.NET では、スレッドをフォアグラウンドで実行(デフォルト)またはバックグラウンドで実行するように設定できます。各スレッドにはIsBackground
プロパティがあります:
- フォアグラウンドスレッド(false デフォルト値):すべてのフォアグラウンドスレッドが実行を完了すると、アプリケーションは直ちに終了します。通常、完了する必要がある重要なタスクをフォアグラウンドスレッドとして設定し、それらが完全に実行されることを保証します。
- バックグラウンドスレッド(true):アプリケーションのメインスレッドが終了すると同時に終了するスレッドです。バックグラウンドスレッドのみが残っている場合、アプリケーションは直ちに終了し、バックグラウンドスレッドの完了を待ちません。通常、重要でない補助的なタスクをバックグラウンドスレッドとして設定することで、アプリケーションがより早く終了できるようにします。例えば、ログ記録、監視などです。
スレッドはすぐに開始されるわけではない#
ほとんどのオペレーティングシステムはリアルタイムオペレーティングシステムではなく、Windows を含みます。スレッドの実行は即座に発生するのではなく、オペレーティングシステムが独自のスケジューリングアルゴリズムに基づいて、どのスレッドをいつ実行するかを決定します。各スレッドには、作業を実行するための CPU 時間の小さなセグメントが割り当てられます。そのため、複数のスレッドが同時に実行されていても、ほぼ同時に実行されているように感じます。システムは適切なタイミングでアルゴリズムに基づいて次のスレッドをスケジュールすることを決定します。
スレッドはプログラミング言語に組み込まれているものではなく、そのスケジューリングは非常に複雑なプロセスです。スレッド間の切り替えには一定の時間と空間が必要であり、リアルタイムではありません。例えば:
for (int i = 0; i < 10; i++)
{
new Thread(() =>
{
Console.WriteLine(i);
}).Start();
}
出力:
出力結果から、スレッドは即座に起動されないことがわかります(複数のスレッドが同じi
値を印刷しました。例えば 5)。ループ内で直接スレッドを起動すると、各スレッドは同じ変数i
を共有します。あるスレッドが実行を開始すると、他のスレッドがi
を変更している可能性があるため、複数のスレッドが同じi
値にアクセスする可能性があります。
これは、スレッドが異なる CPU コアで実行され、CPU とメモリの間にレジスタ、L1キャッシュ、L2キャッシュ、L3キャッシュ、メモリ
の多層構造があるためです。メモリをロックしない場合、ある CPU コアで変数の値が変更され、まだメモリに書き戻されていない間に、別の CPU が古い値を読み取ると、ダーティリードが発生します。
期待される結果を実行したい場合(各スレッドが自分のi
値を受け取る責任を持つ)、スレッドを起動する動作を関数にカプセル化し、関数を呼び出すたびに現在のi
を引数としてその関数に渡すことで、新しいローカル変数i
を作成します。これにより、各スレッドは独自のローカル変数i
を持ち、他のスレッドに影響を与えません。そのため、各スレッドは期待される異なるi
値を得ることができます:
for (int i = 0; i < 10; i++)
{
StartThread(i);
}
void StartThread(int i)
{
new Thread(() =>
{
Console.WriteLine(i);
}).Start();
}
スレッドの優先度(ThreadPriority)#
スレッドは C# で異なる優先度(ThreadPriority)を持ち、起動するすべての Thread、ThreadPool、Task はデフォルトで Normal レベルです。優先度はオペレーティングシステムによるスレッドのスケジューリングに関与し、Windows システムはスレッド優先度に基づくプリエンプティブスケジューリングモデルです。優先度が高いスレッドは常により多くの CPU 時間を取得し、準備が整ったとき(スレッドが作成されて起動され、ブロックまたはサスペンドされていないことを示します)には常に優先的に実行されます。
一般的に、スレッドの優先度を変更することは推奨されません。非常に重要なスレッドでない限り、高優先度のスレッドは短い実行時間を持ち、すぐに待機状態に入る特性を持つべきです。そうでない場合、CPU リソースを長時間占有し、さまざまな問題を引き起こす可能性があります。
実行中のスレッドをキャンセルする(Thread Cancel)#
一定の時間後に実行中のスレッドをキャンセルすることには、以下のような関連があります:
- スレッドは即座に起動することも、即座に停止することもできません。どの方法でスレッドを停止するように通知しても、最も重要なことを終えた後に、適切だと感じたときにスレッドをキャンセルします。例えば、
Thread.Abort
を使用した場合、スレッドが非管理コードを実行している場合、スレッドキャンセル例外はスローされません。コードが CLR に戻るときにのみ、スレッドキャンセル例外が発生します。もちろん、例外も即座には発生しません。 - スレッドのキャンセルは、スレッドが停止要求に応じるかどうかに依存します。スレッドは Canceled インターフェースを提供し、作業中に Canceled 状態を検出する必要があります。Canceled が true であると検出されたときにのみ、スレッドは終了します。
.NET は標準的なキャンセルモデルを提供しています:協調的キャンセル(Cooperative Cancellation)、これは上記の第 2 点で述べたメカニズムです。例えば:
var cts = new CancellationTokenSource();
new Thread(() =>
{
while (true)
{
if (cts.IsCancellationRequested)
{
Console.WriteLine("スレッドがキャンセルされました");
break;
}
Thread.Sleep(100);
}
}).Start();
Console.ReadKey();
cts.Cancel();
メインスレッドはCancellationTokenSource
のCancel
メソッドを使用して作業スレッドに終了を通知し、作業スレッドは一定の頻度で外部からキャンセル信号が入っているかどうかを検出し、適切なタイミングで終了します。作業スレッド自体が主要な役割を果たし、正しく停止することを保証します。
CancellationTokenSource
のToken
には、cts.Cancel()
時にトリガーされるRegister
メソッドがあります:
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("ctsがキャンセルされました"));
Console.ReadKey();
cts.Cancel();
ThreadPool と Thread のキャンセルモデルは同じです。
スレッド数の制御#
タスクマネージャー > パフォーマンス > CPU に表示されるデータから、各プロセスは平均して約 10 スレッドを持っていることがわかります。そのため、各プログラムはあまり多くのスレッドを起動しません:
ネットワークプログラミングで各ソケット接続に対してスレッドを起動してリクエストをリッスンする場合、ユーザー数が増えるとスレッド数も増加します。スレッド数が一定の数に達すると、コンピュータのリソース管理が追いつかなくなります。各スレッドには一定のメモリ空間が割り当てられる必要があり、32 ビットシステムのメモリ制限は通常 2GB - 3GB 程度です。スレッド数が一定の数に達すると、すべてのメモリが枯渇します。さらに、スレッドが多すぎると、CPU がスレッド間で切り替えるオーバーヘッドが大きくなり、大量の CPU 時間を消費します。Socket のような I/O 集約型のアプリケーションには、非同期方式で処理する方が適しています。
スレッドを過剰に作成すると、システムリソースが過度に消費され、パフォーマンスに深刻な影響を与えたり、システムがクラッシュしたりする可能性があります。また、スレッド切り替えのオーバーヘッドが大きく、スレッドが十分な CPU 時間を得ることが難しくなり、スレッド内の操作を実行するためにかなりの時間を待つ必要があります。
実際の開発では、過剰なスレッドの作成を避け、スレッドプールや非同期方式を合理的に利用してタスクを処理し、パフォーマンスを向上させ、リソース消費を削減することが重要です。非同期とスレッドプール技術は、大量のスレッドを効率的に管理でき、実際に作業するスレッド数は少なくなります。
スレッドプール#
スレッドの空間オーバーヘッドは主に以下から来ます:
- スレッドカーネルオブジェクト(Thread Kernel Object):各スレッドはこのようなオブジェクトを作成し、主にスレッドのコンテキスト情報を含み、占有するメモリは約 700 バイトです。
- スレッド環境ブロック(Thread Environment Block):占有するメモリは 4KB です。
- ユーザーモードスタック(User Mode Stack)、つまりスレッドスタック:スレッドスタックはメソッドの引数、ローカル変数、戻り値を保存するために使用されます。各スレッドスタックは 1MB のメモリを占有します。これらのメモリを使い果たすのは簡単で、終了しない再帰メソッドを書いて、メソッドの引数と戻り値がメモリを消費し続けると、すぐに OutOfMemoryException が発生します。
- カーネルモードスタック(Kernel Mode Stack):オペレーティングシステムのカーネルモード関数を呼び出すと、システムは関数の引数をユーザーモードスタックからカーネルモードスタックにコピーします。これにより 12KB のメモリを占有します。
スレッドの時間オーバーヘッドは以下から来ます:
- スレッドが作成されると、システムは上記の(空間オーバーヘッド)メモリ空間を初期化します。
- 次に、CLR はすべての DLL をロードする際に DLLMain メソッドを呼び出し、接続フラグを渡します(スレッドが終了するときにも、DLL の DLLMain メソッドが呼び出され、分離フラグが渡されます)。
- スレッドコンテキスト切り替え:システムには多くのプロセスがロードされており、1 つのプロセスには複数のスレッドが含まれます。しかし、1 つの CPU は常に 1 つのスレッドしか実行できません。各スレッドが実行されているように見えるように、システムは「スレッドコンテキスト」を切り替え続けます。各スレッドは約数十ミリ秒の実行時間を得て、次のスレッドに切り替えられます。このプロセスはおおよそ以下の 5 つのステップに分かれます:
- カーネルモードに入る
- コンテキスト情報(主にいくつかの CPU レジスタ情報)を実行中のスレッドカーネルオブジェクトに保存する
- システムは Spinlock を取得し、次に実行するスレッドを決定し、Spinlock を解放します。次のスレッドが同じプロセス内にない場合、仮想アドレスの交換が必要です。
- 実行されるスレッドカーネルオブジェクトからコンテキスト情報をロードする
- カーネルモードを離れる
スレッドの作成と破棄には時間と空間のコストがあります。スレッドの使用を管理するために、Microsoft はスレッドプール技術を提供しています。スレッドプールは、作業が完了した後にスレッドをプールに回収し、他のタスクで使用できるようにします。スレッドの作成と破棄は CLR のアルゴリズムによって決定されます。実際のプロジェクトでは、スレッドを管理するためにスレッドプールを使用することをお勧めします。次の 2 つのクラスを使用して実装できます:ThreadPoolとBackgroundWorker。使いやすく便利です。例えば:
using System.ComponentModel;
// ThreadPool
ThreadPool.QueueUserWorkItem(state => Console.WriteLine("ThreadPoolから"));
// BackgroundWorker
var bw = new BackgroundWorker();
bw.DoWork += (object? sender, DoWorkEventArgs e) => Console.WriteLine("BackgroundWorkerから");
bw.RunWorkerAsync();
Console.ReadKey();
ThreadPool と BackgroundWorker は異なるスレッド処理技術であり、使用シーンや特徴にいくつかの違いがあります:
- ThreadPool(スレッドプール):スレッドプールはスレッドを管理し再利用するメカニズムです。事前に一組のスレッドを作成し、それをスケジュールおよび管理して、タスクを実行する必要があるときに利用可能なスレッドを割り当てます。ThreadPool は大量の短期小タスクを並行して実行する場合に適しており、スレッドの頻繁な作成と破棄のオーバーヘッドを回避できます。
ThreadPool.QueueUserWorkItem
メソッドまたはTask.Run
メソッドを使用して作業項目をスレッドプールに追加できます。 - BackgroundWorker(バックグラウンドワーカー):BackgroundWorker は非同期操作をカプセル化したコンポーネントです。WinForms や WPF アプリケーションでバックグラウンドスレッドを使用して長時間実行されるタスクを実行するプロセスを簡素化します。BackgroundWorker は進捗報告と完了イベントを提供し、UI スレッドとバックグラウンドスレッド間で簡単に通信できます。バックグラウンドスレッドで長時間実行されるタスクを実行し、UI を更新する必要がある場合に適しています。
タスク#
**[Task](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task?view=net-7.0)
は.NET 4 + で提供される非同期プログラミングのための高レベルの抽象化です **。これは非同期操作またはスケジュール可能なコードの一部を表すことができ、タスクはスレッドプール内のスレッドに割り当てて実行できます。Task は ThreadPool や BackgroundWorker を超えた存在であり、ThreadPool に対してスレッドを管理するためのより多くの API を提供します。タスクの状態を確認するために、Statusプロパティや、IsCanceled、IsCompleted、IsFaultedプロパティを使用できます:
// タスクのキャンセルをシミュレートするためのCancellationTokenSourceを作成
var cts = new CancellationTokenSource();
Task.Run(() =>
{
Console.WriteLine("私は非同期スレッドです...");
}).ContinueWith(t =>
{
if (t.IsCanceled)
{
Console.WriteLine("スレッドがキャンセルされました");
}
if (t.IsFaulted)
{
Console.WriteLine("例外が発生してキャンセルされました");
}
if (t.IsCompleted)
{
Console.WriteLine("スレッドが正常に実行を完了しました");
}
});
Console.ReadKey();
// タスクをキャンセル
// cts.Cancel();
ContinueWith
はタスク完了の通知、データの返却、タスクのキャンセル、例外などの処理を自然にサポートします。Task のResult
プロパティを使用すると、スレッドが実行を完了した後に返された値を取得でき、結果を得るまでスレッドをブロックします。
一般的には、Task.Factory.StartNew
を使用して Task をインスタンス化および起動し、Task.Factory.ContinueWhenAll
(すべてを待機)やTask.Factory.ContinueWhenAny
(いずれかを待機)を使用して複数の Task の実行結果を操作します。
Task を同期にしたい場合は、Wait
メソッドを呼び出すだけです。
async / await#
メソッドや式にasync修飾子
を使用すると、それは非同期メソッドと呼ばれ、非同期メソッドの戻り値の型はTask
、Task<T>
またはValueTask<T>
でなければなりません。await演算子
は非同期操作が完了するのを待機し、現在のメソッドの実行を一時的に中断し、非同期操作が完了した後に残りのコードの実行を続けることができます。これらはほぼ対で出現します。
同期待機と非同期待機#
以下のコードを考えて、最終的な出力結果はおおよそどのくらいになるでしょうか:
using System.Diagnostics;
var sw = Stopwatch.StartNew();
var task = MyMethod();
Thread.Sleep(4000);
var result = task.Result;
Console.WriteLine(sw.ElapsedMilliseconds);
static async Task<string> MyMethod()
{
await Task.Delay(5000);
return "aaa";
}
このコードでは、最初に非同期メソッドMyMethod
を呼び出し、スレッドを開始します。このスレッド内で、await
を使用して 5000ms の非同期待機を行います。同時に、メインスレッドでは 4000ms の同期待機を行います。
メインスレッドが 4000ms 待機した後、別の変数が非同期スレッドから値を返すのを待っています。その前に、メインスレッドと非同期スレッドは同時に待機しています。したがって、非同期メソッドが結果を返すのを待っている間に、追加で 1000ms 待機する必要があります。
最終的に、MyMethod
メソッドが結果 "aaa" を返した後、プログラムの実行時間は 5000ms になります。
非同期スレッドが実行された後、メインスレッドと相互作用しない場合、メインスレッドの実行をブロックすることはありません。ただし、メインスレッドが非同期メソッドの結果を待つ必要があり、非同期スレッドがまだ完了していない場合、メインスレッドがブロックされることになります。
並列計算(Parallel)#
System.Threading.Tasks.Parallel
は、並列プログラミングのための静的クラスです。これはタスクの並行実行のコーディングプロセスを簡素化するための一連の静的メソッドを提供します。主に Invoke、For、ForEach の 3 つの関数を提供します。
最も一般的に使用されるメソッドはParallel.For
とParallel.ForEach
です。これらのメソッドは、反復操作を並列化し、複数のスレッドで同時に実行できるようにします:
Action a = () => Console.WriteLine(DateTime.Now.ToString("HH:mm:ss"));
Parallel.Invoke(Enumerable.Range(1, 5).Select(x => a).ToArray());
Parallel.For(0, 5, i =>
{
// ループ本体のロジック
Console.WriteLine("現在の反復: " + i);
});
List<int> numbers = new List<int> { 1, 2, 3, 4, 5 };
Parallel.ForEach(numbers, number =>
{
// 要素のロジック
Console.WriteLine("現在の数: " + number);
});
Console.WriteLine("こんにちは");
出力:
各種の実行は無秩序であり、Parallel が起動すると現在のスレッドをブロックします。
タスクを使用する場合、通常は Run メソッドを呼び出して非同期タスクを開始し、await キーワードを使用してタスクの完了を待機します。これにより、タスクを非同期に実行し、必要に応じてタスクの完了後に他の操作を続行できます。
Parallel クラスは、並列計算を簡素化するために設計されており、Parallel.For や Parallel.ForEach のような静的メソッドを提供して、ループや反復操作を並列に実行します。Parallel クラスは自動的にタスクを複数のスレッドに割り当て、多核 CPU で同時に実行し、並列計算の効果を達成します。
要約すると、Task は非同期プログラミングとタスク調整に使用され、Parallel クラスは並列計算を簡素化するために使用され、両者の使用方法とメカニズムには違いがあります。
Parallel の誤った使用#
Parallel のループ操作は、各タスクの開始時に初期化操作を行い、終了時に後処理操作を行うことをサポートし、タスクの状態を監視することも許可します。注意すべきは、以前の「タスク」に関する説明は誤りであり、「スレッド」とするべきです。例えば、以下のコード:
var list = new List<int>() { 1, 2, 3, 4, 5, 6 };
var sum = 0;
Parallel.For(0, list.Count,
() =>
{
Console.WriteLine($"localInit i:1, ThreadId:{Environment.CurrentManagedThreadId}");
return 1;
},
(i, state, total) =>
{
Console.WriteLine($"body i:{i}, total:{total}, ThreadId:{Environment.CurrentManagedThreadId}");
total += i;
return total;
},
i =>
{
Console.WriteLine($"localFinally i:{i}, ThreadId:{Environment.CurrentManagedThreadId}");
Interlocked.Add(ref sum, i);
});
Console.WriteLine(sum);
Parallel.For
のパラメータ説明をよく見てください:
/// <summary>
/// 並列実行の状況でforループを実行します。
/// </summary>
/// <typeparam name="TLocal">スレッドローカルデータの型。</typeparam>
/// <param name="fromInclusive">開始インデックス(含む)。</param>
/// <param name="toExclusive">終了インデックス(含まない)。</param>
/// <param name="localInit">各スレッドのローカルデータの初期状態を返すための関数デリゲート。</param>
/// <param name="body">各反復で呼び出されるデリゲート。</param>
/// <param name="localFinally">各スレッドのローカル状態に対して最後の操作を実行するデリゲート。</param>
/// <returns>ループのどの部分が完了したかに関する情報を含む<see cref="System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see>構造体。</returns>
/// <remarks>
/// <para>
/// 反復範囲[fromInclusive, toExclusive)の各値に対して、<paramref name="body"/>デリゲートが1回呼び出されます。
/// それは以下のパラメータを受け取ります:反復カウント(Int32)、ループの早期終了に使用できる<see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see>インスタンス、
/// 同じスレッドで実行される反復間で共有できるローカル状態のいくつか。
/// </para>
/// <para>
/// **ループに参加する各スレッドに対して、<paramref name="localInit"/>デリゲートが1回呼び出され、各スレッドの初期ローカル状態を返します。**
/// **これらの初期状態は、各スレッドの最初の<paramref name="body"/>呼び出しに渡されます。**
/// **その後、各後続のbody呼び出しは、変更された可能性のある状態値を返し、その状態値は次のbody呼び出しに渡されます。**
/// **最後に、各スレッドの最後のbody呼び出しが状態値を返し、その値が<paramref name="localFinally"/>デリゲートに渡されます。**
/// **各スレッドのローカル状態は、<paramref name="localFinally"/>デリゲートを1回実行して最後の操作を実行します。**
/// </para>
/// </remarks>
public static ParallelLoopResult For<TLocal>(
int fromInclusive, int toExclusive,
Func<TLocal> localInit,
Func<int, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally)
Parallel.For
は並行方式で For ループを開始し、ループ本体は body パラメータがスレッドプールで処理されます。最も理解しにくい 3 つのパラメータの意味は以下の通りです:
- localInit:各新しいスレッドが作成された後、最初に
localInit
を実行して初期化行動を行い、初期状態を返します。言い換えれば、localInit が実行された回数は、Parallel が開始したスレッドの数を示します。 - body:ループ本体の内容、初期状態(localInit)は各スレッドの最初の body に渡されます。各後続の body 呼び出しは、前回の body の戻り値に基づいて total 値を持ちます。つまり、並行ループが 6 回実行されて 1 つのスレッドしか作成されなかった場合、このスレッドは 6 回 body パラメータを実行し、localInit は 1 回しか実行されません。なぜなら、スレッドが 1 つしかないからです。body のパラメータ (i, state, total) はそれぞれ(現在のループ値の範囲 [0-list.Count)、Parallel の現在の状態、localInit が返した値)を示します。
- localFinally:各スレッドの最後の body が実行された後に 1 回実行されます。つまり、各スレッドが終了するたびに 1 回実行され、パラメータ i は最後の body が返した状態値です。言い換えれば、localFinally が実行された回数は、Parallel が開始したスレッドの数を示します。
// [0-list.Count)の合計
0+1+2+3+4+5=15
各スレッドが作成されるとlocalInitが1回実行され、各スレッドは複数のbodyを実行する可能性があります。各スレッドの最後のbodyが実行された後、スレッドが解放される前に1回localFinallyが実行されます。作成されたスレッドの数に応じて、最終的なsum値は異なります:
スレッド数 結果
1 16
2 17
3 18
4 19
5 20
6 21
別の明確な例:
var list = new List<string>() { "aa", "bb", "cc", "dd", "ee", "ff", "gg" };
var str = string.Empty;
Parallel.For(0, list.Count, () => "-", (i, state, total) => total += list[i], s =>
{
str += s;
Console.WriteLine("end:" + s);
});
Console.WriteLine(str);
結果から、合計出力が 4 回 "end" であることがわかります。これは、7(list.Count)回の並行ループで 4 つのスレッドが作成されたことを示しています。各スレッドは:
- 作成後に localInit を 1 回実行します。
- 最初の body が実行されると、localInit が返した状態値を取得できます。以降の body の実行時、その total 値は前回の body の戻り値に基づいています。
- 各スレッドの最後の body が実行された後、スレッドが解放される前に 1 回 localFinally が実行され、その最後の body の戻り値が渡されます。
並列計算は必ずしも直列より速いわけではない#
並列計算にはスレッドを作成する必要があり、スレッドの作成と破棄には時間と空間のオーバーヘッドが必要です。ループ本体の実行時間が非常に短い場合(特に時間のかかる操作がない場合)、並列の速度は直列よりも遅くなります。例えば:
using System.Diagnostics;
var sw = Stopwatch.StartNew();
for (int i = 0; i < 2000; i++)
{
for (int j = 0; j < 10; j++)
{
var sum = i + j;
}
}
Console.WriteLine("直列ループの所要時間:" + sw.ElapsedMilliseconds);
sw.Restart();
Parallel.For(0, 2000, i =>
{
for (int j = 0; j < 10; j++)
{
var sum = i + j;
}
});
Console.WriteLine("並列ループの所要時間:" + sw.ElapsedMilliseconds);
しかし、ループ本体の実行時間を長くすると、並列ループは直列ループよりも優れた性能を発揮します:
for (int j = 0; j < 100000; j++)
したがって、ループ本体の実行時間が長い場合にのみ、並列計算を検討するべきです。
並列計算でのロック#
並列計算はマルチスレッドで実行されるため、共有リソースにアクセスする必要がある場合は、データの一貫性を保証するためにロックを使用する必要があります。ロックは、コードを同期させる必要がある場合や、共有リソースを長時間占有する場合に適用されます。
整数変数に対して原子操作を行う場合、Interlocked.Add
メソッドを使用できます。これにより、同期のパフォーマンス損失が大幅に減少します。
var list = new List<int>() { 1, 2, 3, 4, 5, 6 };
int sum = 0;
Parallel.For(0, list.Count, () => 1, (i, state, total) =>
{
total += i;
return total;
}, i => Interlocked.Add(ref sum, i));
Console.WriteLine(sum);
上記のコードでは、最後に原子操作を行わないと、最終的にアセンブリ言語での最後の mov 操作時にメモリアドレスの整列問題が発生する可能性がありますが、Interlocked はこの問題を解決します。同時に.NET は、変数の原子操作問題を解決するために[volatile](https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/keywords/volatile)
キーワードを提供していますが、これはマルチスレッドシーンには適用できません:
var mc = new MyClass();
Parallel.For(0, 100000, i =>
{
mc.AddCount();
});
Console.WriteLine(mc.Count);
public class MyClass
{
public volatile int Count;
public void AddCount()
{
Count++;
}
}
上記の出力は必ず 100000 未満になります。これは、共有リソースへのアクセス時にロックが行われず、出力結果が期待通りにならないためです。マルチスレッドアクセスのシーンでは、[Interlocked](https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.interlocked)
または[lock](https://learn.microsoft.com/zh-cn/dotnet/csharp/language-reference/statements/lock)
文を使用して共有リソースへのアクセスを保護します:
// 上記のコードを次のいずれかの方法で変更します
// 方法1
public void AddCount()
{
Interlocked.Add(ref Count, 1);
//Count++;
}
// 方法2
Parallel.For(0, 100000, i =>
{
lock (mc)
{
mc.AddCount();
}
});
ただし、これにより新たな問題が発生します。ロックの存在により、システムオーバーヘッド(CPU 時間とメモリ)、スレッド切り替え時間などが増加します。つまり、ループ本体のすべてのコードをロックする必要がある場合、並列計算を使用する必要はありません。なぜなら、これにより直列計算よりも遅くなるからです。
PLINQ#
従来のLINQは単一スレッドで直列に実行されますが、PLINQは LINQ の並列実装です。つまり、Parallel LINQです。PLINQ の実装はほぼSystem.Linq.ParallelEnumerableクラスにあり、その実行モードは以下の通りです:PLINQ 介绍 | Microsoft Learn。要約すると、PLINQ は内部で分析結果に基づいて並列または直列実行を選択し、最適なクエリ速度を得ることができます。例えば:
var list = Enumerable.Range(1, 6);
var query = from i in list
select i;
foreach (var i in query)
{
Console.WriteLine(i);
}
Console.WriteLine("----------");
var query2 = from i in list.AsParallel()
select i;
foreach (var i in query2)
{
Console.WriteLine(i);
}
LINQ は順序で出力されますが、PLINQ は無秩序で出力されます(並行してマルチスレッド)。
実際の開発では、並列が常に直列よりも速いわけではなく、使用シーンに応じて最適な方法を見つける必要があります。
並列プログラミングの例外処理#
以下のコードが例外をスローするかどうかを考えてみてください:
MyMethod();
static async Task MyMethod()
{
await Task.CompletedTask;
throw new Exception();
}
実際には、例外はスローされません。なぜなら、MyMethod は非同期メソッドであり、別のスレッドで実行されて例外が発生しますが、呼び出しスレッド(メインスレッド)と相互作用しないため、呼び出し元は例外が発生したかどうかを知りません。
タスク内の例外処理#
タスクが相互作用できる場合、例えばタスクの Wait、WaitAny、WaitAll などのブロッキングメソッドを呼び出すか、タスクの Result プロパティを取得すると、タスク内で発生した例外をキャッチできます。例外の種類はAggregateException
であり、これは並列プログラミングにおける最上位の例外です。
ブロッキング(同期)で取得できる場合は、タスクの Wait * ブロッキングメソッドを使用するか、await Task を使用します。ブロッキング後、同期コードの例外は try/catch を使用してキャッチする必要があります。
非ブロッキング(非同期)でタスクの例外をキャッチする必要がある場合は、タスクの ContinueWith またはイベント通知を使用します(これは非常に面倒です)。
Parallel 内の例外処理#
タスクに比べて、Parallel の例外処理ははるかに簡単です。なぜなら、Parallel は同期実行であり、つまり主スレッドをブロックするため、内部でスローされた例外は直接主スレッドでキャッチできます:
using System.Collections.Concurrent;
// スレッドセーフなキュー
var exs = new ConcurrentQueue<Exception>();
try
{
Parallel.For(0, 2, i =>
{
try
{
throw new ArgumentException();
}
catch (Exception e)
{
exs.Enqueue(e);
throw new AggregateException(exs);
}
});
}
catch (AggregateException e)
{
foreach (var ex in e.InnerExceptions)
{
Console.WriteLine($"例外の種類:{ex.GetType()}、例外のソース:{ex.Source}、例外のメッセージ:{ex.Message}");
}
}
System.Console.WriteLine(exs.Count);
参考文献#
- https://masuit.com/1201
- https://threads.whuanle.cn/ 上記の記事よりもさらに深く詳細です
- https://learn.microsoft.com/zh-cn/dotnet/standard/parallel-programming/
- https://learn.microsoft.com/zh-cn/dotnet/standard/clr
- マルチスレッド(Multithread) - 随笔分类 - InCerry - 博客园 (cnblogs.com)
- ThreadPool クラス (System.Threading) | Microsoft Learn
- BackgroundWorker クラス (System.ComponentModel) | Microsoft Learn
- Task クラス (System.Threading.Tasks) | Microsoft Learn
- async - C# リファレンス | Microsoft Learn
- await 演算子 - 非同期タスクの完了を待機 | Microsoft Learn
- C# における非同期プログラミング | Microsoft Learn
- タスクベースの非同期パターン(TAP):概要と紹介 | Microsoft Learn
- Parallel クラス (System.Threading.Tasks) | Microsoft Learn
- referencesource/mscorlib/system/threading/Tasks/Parallel.cs at master · microsoft/referencesource · GitHub
- volatile - C# リファレンス | Microsoft Learn
- Interlocked クラス (System.Threading) | Microsoft Learn