名前付きパイプでプロセス間の双方向通信を行う(最終版)

パイプ関連記事 もくじ
https://tera1707.com/entry/2022/02/06/144447#Pipe

やりたいこと

以前の記事で、パイプの受信用タスクがキャンセルできるようにした。

どうせ閉じるなら、DisposableにしてDisposeしたときに勝手にキャンセルするようにしたい。また、ついでに動作確認のためのUnitTestを作りたい。さらについでにこのクラス自体のinterfaceをつくりたい。

最終版のサンプルプログラム

Dispose時に自分をキャンセルするようにした。

https://github.com/tera1707/PipeJikken

pipeconnect.cs

using System.Diagnostics;
using System.IO.Pipes;
using System.Security.Principal;

namespace PipeJikken
{
    public class PipeConnect :IDisposable, IPipeConnect
    {
        private static readonly int RecvPipeThreadMax = 1;
        private CancellationTokenSource _lifetimeCts = new CancellationTokenSource();
        private bool disposedValue;

        // パイプから受信を行う処理
        // 送信(クライアント)側が切断(close)すると、IOExceptionが来るので再度パイプサーバー作成しなおしする。
        public Task CreateServerAsync(string pipeName, Action<string> onRecv, CancellationToken ct = default)
        {
            var combinedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, _lifetimeCts.Token);

            return Task.Run(async () =>
            {
                while (true)
                {
                    try
                    {
                        // 同じパイプに対しての接続は1件まで
                        using (var pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.InOut, RecvPipeThreadMax, PipeTransmissionMode.Byte, PipeOptions.Asynchronous | PipeOptions.CurrentUserOnly))
                        {
                            // クライアントの接続待ち
                            ConsoleWriteLine($"受信:クライアントの接続待ち開始");
                            await pipeServer.WaitForConnectionAsync(combinedCts.Token);

                            ConsoleWriteLine($"受信:StreamReader");
                            using (var reader = new StreamReader(pipeServer))
                            {
                                // 受信待ち
                                ConsoleWriteLine($"受信:読み込み開始");
                                var recvString = await reader.ReadLineAsync();

                                ConsoleWriteLine($"受信:受信文字列:{recvString ?? "null"}");

                                onRecv.Invoke(recvString ?? "null");
                            }
                        }
                    }
                    catch (IOException ofex)
                    {
                        // クライアントが切断
                        ConsoleWriteLine("受信:クライアント側が切断しました");
                        ConsoleWriteLine(ofex.Message);
                    }
                    catch (OperationCanceledException oce)
                    {
                        // パイプサーバーのキャンセル要求(OperationCanceledExceptionをthrowしてTaskが終わると、Taskは「Cancel」扱いになる)
                        ConsoleWriteLine($"受信:パイプサーバーのキャンセル要求がきました。{oce.GetType()}");
                        throw;
                    }
                    finally
                    {
                        ConsoleWriteLine("受信:パイプ終了");
                    }
                }
            });
        }

        // パイプに対して送信を行う処理
        // 1件送信するごとに、パイプ接続→切断するタイプ。
        public async Task CreateClientAsync(string pipeName, string writeString)
        {
            await Task.Run(async () =>
            {
                using (var pipeClient = new NamedPipeClientStream(".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.CurrentUserOnly, TokenImpersonationLevel.Impersonation))
                {
                    await pipeClient.ConnectAsync(1000);

                    using (var writer = new StreamWriter(pipeClient))
                    {
                        await writer.WriteLineAsync(writeString);
                        writer.Flush();

                        ConsoleWriteLine(" 送信完了");
                    }
                }

                ConsoleWriteLine(" 送信:パイプ終了");
            });
        }

        private static void ConsoleWriteLine(string log)
        {
            Debug.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")} {log}");
        }

        #region IDisposable

        protected virtual void Dispose(bool disposing)
        {
            if (!disposedValue)
            {
                if (disposing)
                {
                    // TODO: マネージド状態を破棄します (マネージド オブジェクト)
                    _lifetimeCts.Cancel();
                }

                // TODO: アンマネージド リソース (アンマネージド オブジェクト) を解放し、ファイナライザーをオーバーライドします
                // TODO: 大きなフィールドを null に設定します
                disposedValue = true;
            }
        }

        // // TODO: 'Dispose(bool disposing)' にアンマネージド リソースを解放するコードが含まれる場合にのみ、ファイナライザーをオーバーライドします
        // ~PipeConnect()
        // {
        //     // このコードを変更しないでください。クリーンアップ コードを 'Dispose(bool disposing)' メソッドに記述します
        //     Dispose(disposing: false);
        // }

        public void Dispose()
        {
            // このコードを変更しないでください。クリーンアップ コードを 'Dispose(bool disposing)' メソッドに記述します
            Dispose(disposing: true);
            GC.SuppressFinalize(this);
        }

        #endregion
    }
}

IPipeConnect,cs

namespace PipeJikken
{
    public interface IPipeConnect
    {
        Task CreateServerAsync(string pipeName, Action<string> onRecv, CancellationToken ct = default);
        Task CreateClientAsync(string pipeName, string writeString);
    }
}

動作確認用UnitTest

using PipeJikken;
using System.Collections.Concurrent;
using System.Diagnostics;

namespace TestProject
{
    [TestClass]
    public class UnitTest
    {
        string pipeName = "MyPipe1";

        [TestMethod]
        [Description("送信した文字列と受信した文字列が一致することの確認")]
        public async Task TestMethod1()
        {
            var _cancelServer = new CancellationTokenSource();
            var _cancelClient = new CancellationTokenSource();

            var sendString = "送る文字列";
            var recvString = string.Empty;

            using (var pipe = new PipeConnect())
            {
                Debug.WriteLine("-----実験開始-----");

                // 受信Pipeサーバー立ち上げ
                _ = pipe.CreateServerAsync(pipeName, OnRecv, _cancelServer.Token);

                // 送信
                await pipe.CreateClientAsync(pipeName, sendString);
                await Task.Delay(2000);

                Assert.AreEqual(sendString, recvString);
            }

            void OnRecv(string s)
            {
                recvString = s;
            }
        }

        [TestMethod]
        [Description("受信タスクがキャンセルできることの確認")]
        public async Task TestMethod2()
        {
            var _cancelServer = new CancellationTokenSource();
            var _cancelClient = new CancellationTokenSource();

            var sendString = "送る文字列";
            var recvString = string.Empty;

            using (var pipe = new PipeConnect())
            {
                var recvTask = pipe.CreateServerAsync(pipeName, ((recvString) => { }), _cancelServer.Token);

                await pipe.CreateClientAsync(pipeName, sendString);

                await Task.Delay(1000);

                _cancelServer.Cancel();

                await Task.Delay(1000);

                // キャンセルした後の送信ができないことを見る
                await Assert.ThrowsExceptionAsync<TimeoutException>(() => pipe.CreateClientAsync(pipeName, "AAA"));

                //recvTaskの例外がOperationCancelledExceptionであることの確認
                try
                {
                    await Task.WhenAll(recvTask);
                }
                catch (OperationCanceledException)
                {
                    Assert.AreEqual(true, recvTask.IsCanceled);
                }
            }
        }

        [TestMethod]
        [Description("Disposeで受信タスクがキャンセルされることの確認")]
        public async Task TestMethod3()
        {
            var _cancelServer = new CancellationTokenSource();
            var _cancelClient = new CancellationTokenSource();

            var sendString = "送る文字列";
            var recvString = string.Empty;

            var pipe = new PipeConnect();
            
            var recvTask = pipe.CreateServerAsync(pipeName, ((recvString) => { }), _cancelServer.Token);

            await pipe.CreateClientAsync(pipeName, sendString);
            
            await Task.Delay(1000);

            pipe.Dispose();

            await Task.Delay(1000);

            // キャンセルした後の送信ができないことを見る
            await Assert.ThrowsExceptionAsync<TimeoutException>(() => pipe.CreateClientAsync(pipeName, "AAA"));

            //recvTaskの例外がOperationCancelledExceptionであることの確認
            try
            {
                await Task.WhenAll(recvTask);
            }
            catch (OperationCanceledException)
            {
                Assert.AreEqual(true, recvTask.IsCanceled);
            }
        }


        [TestMethod]
        [Description("連続して送信した場合に受け取れるか確認")]
        public async Task TestMethod4()
        {
            var recvList = new ConcurrentBag<string>();
            var _cancelServer = new CancellationTokenSource();
            var _cancelClient = new CancellationTokenSource();

            var sendString = "送る文字列";
            var recvString = string.Empty;

            using var pipe = new PipeConnect();

            // 受信Pipeサーバー立ち上げ
            _ = pipe.CreateServerAsync(pipeName, OnRecv, _cancelServer.Token);

            for (int i = 0; i < 100; i++)
            {
                // 送信
                await pipe.CreateClientAsync(pipeName, sendString + i.ToString());
            }

            for (int i = 0; i < 100; i++)
            {
                var s = sendString + i.ToString();// 送ったはずの文字列
                if (!recvList.Contains(s))
                {
                    Assert.Fail("送った文字列が、受信できていませんでした → " + s);
                }
            }

            Debug.WriteLine("OK");

            void OnRecv(string s)
            {
                recvList.Add(s);
            }
        }
    }
}

実験用WPFアプリ

using PipeJikken;
using System.Diagnostics;
using System.Windows;

namespace PipeJikkenWPF
{
    /// <summary>
    /// Interaction logic for MainWindow.xaml
    /// </summary>
    public partial class MainWindow : Window
    {
        PipeConnect? _pc;

        public MainWindow()
        {
            InitializeComponent();

            _pc = new PipeConnect();
        }

        // サーバー起動
        private void Button_Click(object sender, RoutedEventArgs e)
        {
            var id = Process.GetCurrentProcess().SessionId;

            _pc?.CreateServerAsync(@"_pipename_" + id, (data =>
            //_pc?.CreateServerAsync(@"_pipename_", (data => 
            {
                this.Dispatcher.Invoke(() => { DataList.Items.Add(data); });
            }));
        }

        // クライアントで送信
        private void Button_Click_1(object sender, RoutedEventArgs e)
        {
            var id = Process.GetCurrentProcess().SessionId;

            var send = SendData.Text;
            _pc?.CreateClientAsync(@"_pipename_" + id, send);
            //_pc?.CreateClientAsync(@"_pipename_", send);
        }
    }
}

気づいた点(複数ユーザーが同じ名前のパイプを使うとうまく動かない?)

パイプサーバーを起動しているときに、ログインしたまま別のユーザーに切り替えて、 そっちのユーザーから同じ名前のパイプにクライアントとして送信すると、

await pipeClient.ConnectAsync(1000);

でクライアントがサーバーに接続しに行くところでブロックされてしまいそれ以降動かなくなった。
タイムアウトもしてくれない。)

なので、上のコードでは、複数ユーザーが同じ名前のパイプを使ってしまわないように、パイプ名の後に、自分のログインしているユーザーのセッションIDを付けて、重複を防いだ。