パイプ関連記事 もくじ
https://tera1707.com/entry/2022/02/06/144447#Pipe
やりたいこと
以前の記事で、名前付きパイプでプロセス間通信のやり方を調べた。
そのパイプの記事では、1つのパイプサーバーに対して、1つのパイプクライアントから文字列を送る、ということをしたのだが、今回、複数のパイプサーバーに対して、1つのパイプクライアントから同じ文字列を送りたくなった。
こんな感じ。
やり方
どうにも、MSの公式なやり方?は見つけられなかった。
で、自分であれこれ実験してると、
同じ名前のパイプを開くパイプサーバーを複数立ち上げてるときに、1つのパイプクライアントに、何回もConnect→送信、を繰り返していると、サーバー1~4に、順番に送信できてる気がする?となってきた。
そんな動きするか?とネットを調べていると、下記のようなstackoverflowを発見した。
Async NamedPipes in case of multiple pipe server instances
どうも、そういう動きするらしい。
で、自分の実験コードとstackoverflowの内容をもとに、実験コードを改造してみた。
実験コード
ほぼほぼ前回の実験コードと同じだが、//★
の部分だけ変えた。それ以外は変わってない。
画面側のコードやInterfaceも前回から変化なし。
using System.Diagnostics; using System.IO.Pipes; using System.Security.Principal; namespace PipeJikken { public class PipeConnect :IDisposable, IPipeConnect { private static readonly int RecvPipeThreadMax = -1;//★サーバーのスレッド数の上限を、-1(接続できる最大数)にする private CancellationTokenSource _lifetimeCts = new CancellationTokenSource(); private bool disposedValue; // パイプから受信を行う処理 // 送信(クライアント)側が切断(close)すると、IOExceptionが来るので再度パイプサーバー作成しなおしする。 public Task CreateServerAsync(string pipeName, Action<string> onRecv, CancellationToken ct = default) { var combinedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, _lifetimeCts.Token); return Task.Run(async () => { while (true) { try { // 同じパイプに対しての接続は1件まで using (var pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.InOut, RecvPipeThreadMax, PipeTransmissionMode.Byte, PipeOptions.Asynchronous | PipeOptions.CurrentUserOnly)) { // クライアントの接続待ち ConsoleWriteLine($"受信:クライアントの接続待ち開始"); await pipeServer.WaitForConnectionAsync(combinedCts.Token); ConsoleWriteLine($"受信:StreamReader"); using (var reader = new StreamReader(pipeServer)) { // 受信待ち ConsoleWriteLine($"受信:読み込み開始"); var recvString = await reader.ReadLineAsync(); ConsoleWriteLine($"受信:受信文字列:{recvString ?? "null"}"); onRecv.Invoke(recvString ?? "null"); } } } catch (IOException ofex) { // クライアントが切断 ConsoleWriteLine("受信:クライアント側が切断しました"); ConsoleWriteLine(ofex.Message); } catch (OperationCanceledException oce) { // パイプサーバーのキャンセル要求(OperationCanceledExceptionをthrowしてTaskが終わると、Taskは「Cancel」扱いになる) ConsoleWriteLine($"受信:パイプサーバーのキャンセル要求がきました。{oce.GetType()}"); throw; } finally { ConsoleWriteLine("受信:パイプ終了"); } } }); } // パイプに対して送信を行う処理 // 1件送信するごとに、パイプ接続→切断するタイプ。 public async Task CreateClientAsync(string pipeName, string writeString) { await Task.Run(async () => { try { // ★まず一発文字列をおくり、同時にサーバー数を取ってくる(下に書いたメソッド) var serverCount = await GetServerCountAndConnectAndSend(pipeName, writeString); // ★GetServerCountAndConnectAndSendで1回データ送ってるので、サーバーの数-1回、forで送る for (int i = 1; i < serverCount; i++) { await GetServerCountAndConnectAndSend(pipeName, writeString); } } catch (TimeoutException te) { ConsoleWriteLine(te.Message); } ConsoleWriteLine(" 送信:パイプ終了"); }); } // ★Connectしてパイプサーバーの数をカウントしつつ、文字列を送信する private async Task<int> GetServerCountAndConnectAndSend(string pipeName, string writeString) { int serverCount = 0; using (var pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.CurrentUserOnly, TokenImpersonationLevel.Impersonation)) { await pipeClient.ConnectAsync(1000); serverCount = pipeClient.NumberOfServerInstances; using (var writer = new StreamWriter(pipeClient)) { await writer.WriteLineAsync(writeString); writer.Flush(); ConsoleWriteLine(" 送信完了"); } } return serverCount; } private static void ConsoleWriteLine(string log) { Debug.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")} {log}"); } #region IDisposable protected virtual void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { // TODO: マネージド状態を破棄します (マネージド オブジェクト) _lifetimeCts.Cancel(); } // TODO: アンマネージド リソース (アンマネージド オブジェクト) を解放し、ファイナライザーをオーバーライドします // TODO: 大きなフィールドを null に設定します disposedValue = true; } } // // TODO: 'Dispose(bool disposing)' にアンマネージド リソースを解放するコードが含まれる場合にのみ、ファイナライザーをオーバーライドします // ~PipeConnect() // { // // このコードを変更しないでください。クリーンアップ コードを 'Dispose(bool disposing)' メソッドに記述します // Dispose(disposing: false); // } public void Dispose() { // このコードを変更しないでください。クリーンアップ コードを 'Dispose(bool disposing)' メソッドに記述します Dispose(disposing: true); GC.SuppressFinalize(this); } #endregion } }
注意ポイント
前回の実験コードではRecvPipeThreadMax
の値を1にしていた。
今回作成したコードで、もしRecvPipeThreadMax
を1のままにしていると、pipeClient.NumberOfServerInstances
の値が1になってしまう。
その場合でも、何度もConnectと送信を繰り返すと、すべてのサーバーにデータを送れるのだが(つまり、前回の実験コードのままで、何度も送ったときと同じ動作)、パイプサーバー数がRecvPipeThreadMax
を見てもわからないので、何回送ればよいかがわからなくなってしまう。
なので、RecvPipeThreadMax
は-1(もしくは、同時に立ち上げるつもりのサーバー数より大きい数)にしておく。
参考
前回の実験コード
https://tera1707.com/entry/2023/04/30/004032
スタックオーバーフロー
Async NamedPipes in case of multiple pipe server instances