複数の名前付きパイプサーバーに、1つのパイプクライアントから同じデータを送る

パイプ関連記事 もくじ
https://tera1707.com/entry/2022/02/06/144447#Pipe

やりたいこと

以前の記事で、名前付きパイプでプロセス間通信のやり方を調べた。

そのパイプの記事では、1つのパイプサーバーに対して、1つのパイプクライアントから文字列を送る、ということをしたのだが、今回、複数のパイプサーバーに対して、1つのパイプクライアントから同じ文字列を送りたくなった。

こんな感じ。

やり方

どうにも、MSの公式なやり方?は見つけられなかった。

で、自分であれこれ実験してると、

同じ名前のパイプを開くパイプサーバーを複数立ち上げてるときに、1つのパイプクライアントに、何回もConnect→送信、を繰り返していると、サーバー1~4に、順番に送信できてる気がする?となってきた。

そんな動きするか?とネットを調べていると、下記のようなstackoverflowを発見した。

Async NamedPipes in case of multiple pipe server instances

https://stackoverflow.com/questions/24701410/async-namedpipes-in-case-of-multiple-pipe-server-instances/25968108#25968108

どうも、そういう動きするらしい。

で、自分の実験コードとstackoverflowの内容をもとに、実験コードを改造してみた。

実験コード

ほぼほぼ前回の実験コードと同じだが、//★の部分だけ変えた。それ以外は変わってない。

画面側のコードやInterfaceも前回から変化なし。

using System.Diagnostics;
using System.IO.Pipes;
using System.Security.Principal;

namespace PipeJikken
{
    public class PipeConnect :IDisposable, IPipeConnect
    {
        private static readonly int RecvPipeThreadMax = -1;//★サーバーのスレッド数の上限を、-1(接続できる最大数)にする
        private CancellationTokenSource _lifetimeCts = new CancellationTokenSource();
        private bool disposedValue;

        // パイプから受信を行う処理
        // 送信(クライアント)側が切断(close)すると、IOExceptionが来るので再度パイプサーバー作成しなおしする。
        public Task CreateServerAsync(string pipeName, Action<string> onRecv, CancellationToken ct = default)
        {
            var combinedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, _lifetimeCts.Token);

            return Task.Run(async () =>
            {
                while (true)
                {
                    try
                    {
                        // 同じパイプに対しての接続は1件まで
                        using (var pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.InOut, RecvPipeThreadMax, PipeTransmissionMode.Byte, PipeOptions.Asynchronous | PipeOptions.CurrentUserOnly))
                        {
                            // クライアントの接続待ち
                            ConsoleWriteLine($"受信:クライアントの接続待ち開始");
                            await pipeServer.WaitForConnectionAsync(combinedCts.Token);

                            ConsoleWriteLine($"受信:StreamReader");
                            using (var reader = new StreamReader(pipeServer))
                            {
                                // 受信待ち
                                ConsoleWriteLine($"受信:読み込み開始");
                                var recvString = await reader.ReadLineAsync();

                                ConsoleWriteLine($"受信:受信文字列:{recvString ?? "null"}");

                                onRecv.Invoke(recvString ?? "null");
                            }
                        }
                    }
                    catch (IOException ofex)
                    {
                        // クライアントが切断
                        ConsoleWriteLine("受信:クライアント側が切断しました");
                        ConsoleWriteLine(ofex.Message);
                    }
                    catch (OperationCanceledException oce)
                    {
                        // パイプサーバーのキャンセル要求(OperationCanceledExceptionをthrowしてTaskが終わると、Taskは「Cancel」扱いになる)
                        ConsoleWriteLine($"受信:パイプサーバーのキャンセル要求がきました。{oce.GetType()}");
                        throw;
                    }
                    finally
                    {
                        ConsoleWriteLine("受信:パイプ終了");
                    }
                }
            });
        }

        // パイプに対して送信を行う処理
        // 1件送信するごとに、パイプ接続→切断するタイプ。
        public async Task CreateClientAsync(string pipeName, string writeString)
        {
            await Task.Run(async () =>
            {
                try
                {
                    // ★まず一発文字列をおくり、同時にサーバー数を取ってくる(下に書いたメソッド)
                    var serverCount = await GetServerCountAndConnectAndSend(pipeName, writeString);

                    // ★GetServerCountAndConnectAndSendで1回データ送ってるので、サーバーの数-1回、forで送る
                    for (int i = 1; i < serverCount; i++)
                    {
                        await GetServerCountAndConnectAndSend(pipeName, writeString);
                    }
                }
                catch (TimeoutException te)
                {
                    ConsoleWriteLine(te.Message);
                }

                ConsoleWriteLine(" 送信:パイプ終了");
            });
        }

        // ★Connectしてパイプサーバーの数をカウントしつつ、文字列を送信する
        private async Task<int> GetServerCountAndConnectAndSend(string pipeName, string writeString)
        {
            int serverCount = 0;

            using (var pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.CurrentUserOnly, TokenImpersonationLevel.Impersonation))
            {
                await pipeClient.ConnectAsync(1000);

                serverCount = pipeClient.NumberOfServerInstances;

                using (var writer = new StreamWriter(pipeClient))
                {
                    await writer.WriteLineAsync(writeString);
                    writer.Flush();

                    ConsoleWriteLine(" 送信完了");
                }
            }

            return serverCount;
        }

        private static void ConsoleWriteLine(string log)
        {
            Debug.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")} {log}");
        }

        #region IDisposable

        protected virtual void Dispose(bool disposing)
        {
            if (!disposedValue)
            {
                if (disposing)
                {
                    // TODO: マネージド状態を破棄します (マネージド オブジェクト)
                    _lifetimeCts.Cancel();
                }

                // TODO: アンマネージド リソース (アンマネージド オブジェクト) を解放し、ファイナライザーをオーバーライドします
                // TODO: 大きなフィールドを null に設定します
                disposedValue = true;
            }
        }

        // // TODO: 'Dispose(bool disposing)' にアンマネージド リソースを解放するコードが含まれる場合にのみ、ファイナライザーをオーバーライドします
        // ~PipeConnect()
        // {
        //     // このコードを変更しないでください。クリーンアップ コードを 'Dispose(bool disposing)' メソッドに記述します
        //     Dispose(disposing: false);
        // }

        public void Dispose()
        {
            // このコードを変更しないでください。クリーンアップ コードを 'Dispose(bool disposing)' メソッドに記述します
            Dispose(disposing: true);
            GC.SuppressFinalize(this);
        }

        #endregion
    }
}

注意ポイント

前回の実験コードではRecvPipeThreadMaxの値を1にしていた。

今回作成したコードで、もしRecvPipeThreadMaxを1のままにしていると、pipeClient.NumberOfServerInstancesの値が1になってしまう。

その場合でも、何度もConnectと送信を繰り返すと、すべてのサーバーにデータを送れるのだが(つまり、前回の実験コードのままで、何度も送ったときと同じ動作)、パイプサーバー数がRecvPipeThreadMaxを見てもわからないので、何回送ればよいかがわからなくなってしまう。

なので、RecvPipeThreadMaxは-1(もしくは、同時に立ち上げるつもりのサーバー数より大きい数)にしておく。

参考

前回の実験コード

https://tera1707.com/entry/2023/04/30/004032

スタックオーバーフロー
Async NamedPipes in case of multiple pipe server instances

https://stackoverflow.com/questions/24701410/async-namedpipes-in-case-of-multiple-pipe-server-instances/25968108#25968108