Disposing of XmlReader with pending async read

2019-06-07 13:01发布

问题:

I'm writing a .NET XMPP library for fun and as has been discussed elsewhere the XmlReader implementation in versions prior to .NET 4.5 was not suitable for parsing XML from a NetworkStream as it would not begin parsing until it filled an internal 4KB buffer or reached EOF.

Other libraries got around this by not using XmlReader at all. As mentioned in the previously linked question, jabber-net uses a port of a Java XML parser. An implementation I found while searching, Babel IM, uses its own simple XML parser. I'm not sure what agsXMPP does.

However, with the release of .NET 4.5 and the new async features XmlReader apparently got an upgrade and can now do true async parsing. I've thus used it to hack together a fairly simple XMPP client that can connect to a server and send and receive messages.

The sticking point however, actually seems to be in disconnecting from the server. On disconnect I would normally just want to Dispose() of my XmlReader instance and the underlying streams. However, Dispose() will actually throw an InvalidOperationException with the message "An asynchronous operation is already in progress." if you call it when an async... well what the message says. However, because of the nature of XMPP, my XmlReader is basically constantly performing an async operation as it waits for XML stanzas from the server to come down the pipe.

There do not appear to be any methods on the XmlReader that I could use to tell it to cancel any pending async operations so that I can Dispose() of it cleanly. Is there a better way to deal with this situation than simply not attempting to dispose of the XmlReader? The XMPP spec states that the server is supposed to send a closing </stream:stream> tag on disconnect. I could use this as a signal to not attempt to perform another async read as nothing else should be coming down the pipe, but there's no guarantee of this.

Here is some sample code to play with. LongLivedTextStream basically emulates an open NetworkStream in that it never reaches EOF and will block until at least 1 byte can be read. You can "inject" XML text into it which the XmlReader will happily parse, but trying to dispose of the reader will trigger the aforementioned exception.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace Example
{
    class LongLivedTextStream : Stream
    {
        ManualResetEvent moarDatas = new ManualResetEvent(false);

        List<byte> data = new List<byte>();
        int pos = 0;

        public void Inject(string text)
        {
            data.AddRange(new UTF8Encoding(false).GetBytes(text));

            moarDatas.Set();
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            var bytes = GetBytes(count).ToArray();

            for (int i = 0; offset + i < buffer.Length && i < bytes.Length; i++)
            {
                buffer[offset + i] = bytes[i];
            }

            return bytes.Length;
        }

        private IEnumerable<byte> GetBytes(int count)
        {
            int returned = 0;

            while (returned == 0)
            {
                if (pos < data.Count)
                {
                    while (pos < data.Count && returned < count)
                    {
                        yield return data[pos];

                        pos += 1; returned += 1;
                    }
                }
                else
                {
                    moarDatas.Reset();
                    moarDatas.WaitOne();
                }
            }
        }

        #region Other Stream Members

        public override bool CanRead
        {
            get { return true; }
        }

        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return false; }
        }

        public override void Flush() { }

        public override long Length
        {
            get { throw new NotSupportedException(); }
        }

        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }

        #endregion
    }

    public class Program
    {
        public static void Main(string[] args)
        {
            Test();
            Console.ReadLine();
        }

        public static async void Test()
        {
            var stream = new LongLivedTextStream();
            var reader = XmlReader.Create(stream, new XmlReaderSettings() { Async = true });

            var t = Task.Run(() =>
                {
                    stream.Inject("<root>");
                    Thread.Sleep(2000);
                    stream.Inject("value");
                    Thread.Sleep(2000);
                    stream.Inject("</root>");
                    Thread.Sleep(2000);

                    reader.Dispose(); // InvalidOperationException: "An asynchronous operation is already in progress."

                    Console.WriteLine("Disposed");
                });

            while (await reader.ReadAsync())
            {
                bool kill = false;

                switch (reader.NodeType)
                {
                    case XmlNodeType.Element:
                        Console.WriteLine("Start: " + reader.LocalName);
                        break;

                    case XmlNodeType.EndElement:
                        Console.WriteLine("End:   " + reader.LocalName);
                        //kill = true; // I could use a particular EndElement as a signal to not try another read
                        break;

                    case XmlNodeType.Text:
                        Console.WriteLine("Text:  " + await reader.GetValueAsync());
                        break;
                }

                if (kill) { break; }
            }
        }
    }
}

EDIT

This example uses an actual NetworkStream and shows that if I Close() or Dispose() of the underlying stream the ReadAsync() call on XmlReader does not return false as hoped, instead it continues to block.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace Example
{
    public class Program
    {
        public static void Main(string[] args)
        {
            NetworkStream stream = null;

            var endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 50000);                                   

            var serverIsUp = new ManualResetEvent(false);
            var doneWriting = new ManualResetEvent(false);

            var t1 = Task.Run(() =>
            {
                var server = new TcpListener(endpoint);
                server.Start();

                serverIsUp.Set();

                var client = server.AcceptTcpClient();

                var writer = new StreamWriter(client.GetStream());

                writer.Write("<root>"); writer.Flush();
                Thread.Sleep(2000);
                writer.Write("value"); writer.Flush();
                Thread.Sleep(2000);
                writer.Write("</root>"); writer.Flush();
                Thread.Sleep(2000);

                doneWriting.Set();
            });

            var t2 = Task.Run(() =>
                {
                    doneWriting.WaitOne();

                    stream.Dispose();

                    Console.WriteLine("Disposed of Stream");
                });

            var t3 = Task.Run(async () =>
            {
                serverIsUp.WaitOne();                

                var socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
                socket.Connect(endpoint);

                stream = new NetworkStream(socket, true);

                var reader = XmlReader.Create(stream, new XmlReaderSettings() { Async = true });

                bool val;
                while (val = await reader.ReadAsync())
                {
                    bool kill = false;

                    switch (reader.NodeType)
                    {
                        case XmlNodeType.Element:
                            Console.WriteLine("Start: " + reader.LocalName);
                            break;

                        case XmlNodeType.EndElement:
                            Console.WriteLine("End:   " + reader.LocalName);
                            //kill = true; // I could use a particular EndElement as a signal to not try another read
                            break;

                        case XmlNodeType.Text:
                            Console.WriteLine("Text:  " + await reader.GetValueAsync());
                            break;
                    }

                    if (kill) { break; }
                }

                // Ideally once the underlying stream is closed, ReadAsync() would return false
                // we would get here and could safely dispose the reader, but that's not the case
                // ReadAsync() continues to block
                reader.Dispose();
                Console.WriteLine("Disposed of Reader");
            });

            Console.ReadLine();
        }
    }
}

回答1:

Try injecting a manual </stream:stream> into the parser. To do this, you may need an adapter class between the NetworkStream and the parser, which passes all of the incoming data to the parser but adds another method to inject the </stream:stream>. You'll need to be careful that you're not in the middle of another stanza when you call that method, perhaps by keeping state on the output side of the parser.