的XmlReader与未决异步读处置(Disposing of XmlReader with pen

2019-09-21 08:39发布

我正在写一个.NET XMPP库乐趣,如已经讨论的其他地方的XmlReader版本中实施之前.NET 4.5不适合用于从XML解析NetworkStream ,因为它不会开始解析直到它填补了一个内部的4KB缓冲区或达到EOF。

其他库不使用解决此得到XmlReader可言。 正如在以前链接的问题中提到,叽里咕噜网使用Java的XML解析器的端口。 我发现,当搜寻的实现,巴贝尔IM,使用它自己的简单的XML解析器 。 我不知道agsXMPP做什么。

然而,随着.NET 4.5的发布和新的异步特性XmlReader显然得到了升级,现在可以做真正的异步解析 。 因此,我用它来破解起来相当简单的XMPP客户端可以连接到服务器并发送和接收消息。

症结然而,实际上似乎是在从服务器断开连接 。 在断开我通常只是想Dispose()我的XmlReader的实例和底层流。 然而, Dispose()实际上将引发InvalidOperationException消息“异步操作已在进行中。” 如果你把它当一个异步......好消息的话。 然而,由于XMPP的性质,我XmlReader基本上是不断进行的异步操作,因为它等待来自服务器的XML节下来的管道。

有没有出现任何方法上XmlReader ,我可以用它来告诉它取消了所有待异步操作,这样我可以Dispose()它干净。 有没有更好的方式来处理这种情况不仅仅是没有试图处置的XmlReader 在XMPP 规范指出服务器应该发送一个关闭</stream:stream>标签上断开。 我可以用这个作为一个信号,不要试图执行另一异步读作不出意外应该被灌进管道,但有没有这方面的保证。

下面是一些示例代码一起玩。 LongLivedTextStream基本上模拟了一个开放NetworkStream ,因为它没有达到EOF并且将阻塞,直到至少1个字节可以被读取。 你可以“注入” XML文本到它其中XmlReader将愉快地分析,而是试图处置读者会引发上述异常。

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace Example
{
    class LongLivedTextStream : Stream
    {
        ManualResetEvent moarDatas = new ManualResetEvent(false);

        List<byte> data = new List<byte>();
        int pos = 0;

        public void Inject(string text)
        {
            data.AddRange(new UTF8Encoding(false).GetBytes(text));

            moarDatas.Set();
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            var bytes = GetBytes(count).ToArray();

            for (int i = 0; offset + i < buffer.Length && i < bytes.Length; i++)
            {
                buffer[offset + i] = bytes[i];
            }

            return bytes.Length;
        }

        private IEnumerable<byte> GetBytes(int count)
        {
            int returned = 0;

            while (returned == 0)
            {
                if (pos < data.Count)
                {
                    while (pos < data.Count && returned < count)
                    {
                        yield return data[pos];

                        pos += 1; returned += 1;
                    }
                }
                else
                {
                    moarDatas.Reset();
                    moarDatas.WaitOne();
                }
            }
        }

        #region Other Stream Members

        public override bool CanRead
        {
            get { return true; }
        }

        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return false; }
        }

        public override void Flush() { }

        public override long Length
        {
            get { throw new NotSupportedException(); }
        }

        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }

        #endregion
    }

    public class Program
    {
        public static void Main(string[] args)
        {
            Test();
            Console.ReadLine();
        }

        public static async void Test()
        {
            var stream = new LongLivedTextStream();
            var reader = XmlReader.Create(stream, new XmlReaderSettings() { Async = true });

            var t = Task.Run(() =>
                {
                    stream.Inject("<root>");
                    Thread.Sleep(2000);
                    stream.Inject("value");
                    Thread.Sleep(2000);
                    stream.Inject("</root>");
                    Thread.Sleep(2000);

                    reader.Dispose(); // InvalidOperationException: "An asynchronous operation is already in progress."

                    Console.WriteLine("Disposed");
                });

            while (await reader.ReadAsync())
            {
                bool kill = false;

                switch (reader.NodeType)
                {
                    case XmlNodeType.Element:
                        Console.WriteLine("Start: " + reader.LocalName);
                        break;

                    case XmlNodeType.EndElement:
                        Console.WriteLine("End:   " + reader.LocalName);
                        //kill = true; // I could use a particular EndElement as a signal to not try another read
                        break;

                    case XmlNodeType.Text:
                        Console.WriteLine("Text:  " + await reader.GetValueAsync());
                        break;
                }

                if (kill) { break; }
            }
        }
    }
}

编辑

此示例使用实际NetworkStream ,并表明如果我Close()Dispose()的基础流的ReadAsync()的调用XmlReader不返回的希望假,而是继续阻止。

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace Example
{
    public class Program
    {
        public static void Main(string[] args)
        {
            NetworkStream stream = null;

            var endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 50000);                                   

            var serverIsUp = new ManualResetEvent(false);
            var doneWriting = new ManualResetEvent(false);

            var t1 = Task.Run(() =>
            {
                var server = new TcpListener(endpoint);
                server.Start();

                serverIsUp.Set();

                var client = server.AcceptTcpClient();

                var writer = new StreamWriter(client.GetStream());

                writer.Write("<root>"); writer.Flush();
                Thread.Sleep(2000);
                writer.Write("value"); writer.Flush();
                Thread.Sleep(2000);
                writer.Write("</root>"); writer.Flush();
                Thread.Sleep(2000);

                doneWriting.Set();
            });

            var t2 = Task.Run(() =>
                {
                    doneWriting.WaitOne();

                    stream.Dispose();

                    Console.WriteLine("Disposed of Stream");
                });

            var t3 = Task.Run(async () =>
            {
                serverIsUp.WaitOne();                

                var socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
                socket.Connect(endpoint);

                stream = new NetworkStream(socket, true);

                var reader = XmlReader.Create(stream, new XmlReaderSettings() { Async = true });

                bool val;
                while (val = await reader.ReadAsync())
                {
                    bool kill = false;

                    switch (reader.NodeType)
                    {
                        case XmlNodeType.Element:
                            Console.WriteLine("Start: " + reader.LocalName);
                            break;

                        case XmlNodeType.EndElement:
                            Console.WriteLine("End:   " + reader.LocalName);
                            //kill = true; // I could use a particular EndElement as a signal to not try another read
                            break;

                        case XmlNodeType.Text:
                            Console.WriteLine("Text:  " + await reader.GetValueAsync());
                            break;
                    }

                    if (kill) { break; }
                }

                // Ideally once the underlying stream is closed, ReadAsync() would return false
                // we would get here and could safely dispose the reader, but that's not the case
                // ReadAsync() continues to block
                reader.Dispose();
                Console.WriteLine("Disposed of Reader");
            });

            Console.ReadLine();
        }
    }
}

Answer 1:

尝试注入手册</stream:stream>到解析器。 要做到这一点,可能需要所述的NetworkStream和解析器,它通过了所有输入数据的给分析器,但增加了另一种方法,以注射之间的适配器类</stream:stream> 。 你需要小心,你是不是在另一节的中间,当你调用该方法,或许通过保持解析器的输出端的状态。



文章来源: Disposing of XmlReader with pending async read