我正在写一个.NET XMPP库乐趣,如已经讨论的其他地方的XmlReader
版本中实施之前.NET 4.5不适合用于从XML解析NetworkStream
,因为它不会开始解析直到它填补了一个内部的4KB缓冲区或达到EOF。
其他库不使用解决此得到XmlReader
可言。 正如在以前链接的问题中提到,叽里咕噜网使用Java的XML解析器的端口。 我发现,当搜寻的实现,巴贝尔IM,使用它自己的简单的XML解析器 。 我不知道agsXMPP做什么。
然而,随着.NET 4.5的发布和新的异步特性XmlReader
显然得到了升级,现在可以做真正的异步解析 。 因此,我用它来破解起来相当简单的XMPP客户端可以连接到服务器并发送和接收消息。
症结然而,实际上似乎是在从服务器断开连接 。 在断开我通常只是想Dispose()
我的XmlReader
的实例和底层流。 然而, Dispose()
实际上将引发InvalidOperationException
消息“异步操作已在进行中。” 如果你把它当一个异步......好消息的话。 然而,由于XMPP的性质,我XmlReader
基本上是不断进行的异步操作,因为它等待来自服务器的XML节下来的管道。
有没有出现任何方法上XmlReader
,我可以用它来告诉它取消了所有待异步操作,这样我可以Dispose()
它干净。 有没有更好的方式来处理这种情况不仅仅是没有试图处置的XmlReader
? 在XMPP 规范指出服务器应该发送一个关闭</stream:stream>
标签上断开。 我可以用这个作为一个信号,不要试图执行另一异步读作不出意外应该被灌进管道,但有没有这方面的保证。
下面是一些示例代码一起玩。 LongLivedTextStream
基本上模拟了一个开放NetworkStream
,因为它没有达到EOF并且将阻塞,直到至少1个字节可以被读取。 你可以“注入” XML文本到它其中XmlReader
将愉快地分析,而是试图处置读者会引发上述异常。
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
namespace Example
{
class LongLivedTextStream : Stream
{
ManualResetEvent moarDatas = new ManualResetEvent(false);
List<byte> data = new List<byte>();
int pos = 0;
public void Inject(string text)
{
data.AddRange(new UTF8Encoding(false).GetBytes(text));
moarDatas.Set();
}
public override int Read(byte[] buffer, int offset, int count)
{
var bytes = GetBytes(count).ToArray();
for (int i = 0; offset + i < buffer.Length && i < bytes.Length; i++)
{
buffer[offset + i] = bytes[i];
}
return bytes.Length;
}
private IEnumerable<byte> GetBytes(int count)
{
int returned = 0;
while (returned == 0)
{
if (pos < data.Count)
{
while (pos < data.Count && returned < count)
{
yield return data[pos];
pos += 1; returned += 1;
}
}
else
{
moarDatas.Reset();
moarDatas.WaitOne();
}
}
}
#region Other Stream Members
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return false; }
}
public override void Flush() { }
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
#endregion
}
public class Program
{
public static void Main(string[] args)
{
Test();
Console.ReadLine();
}
public static async void Test()
{
var stream = new LongLivedTextStream();
var reader = XmlReader.Create(stream, new XmlReaderSettings() { Async = true });
var t = Task.Run(() =>
{
stream.Inject("<root>");
Thread.Sleep(2000);
stream.Inject("value");
Thread.Sleep(2000);
stream.Inject("</root>");
Thread.Sleep(2000);
reader.Dispose(); // InvalidOperationException: "An asynchronous operation is already in progress."
Console.WriteLine("Disposed");
});
while (await reader.ReadAsync())
{
bool kill = false;
switch (reader.NodeType)
{
case XmlNodeType.Element:
Console.WriteLine("Start: " + reader.LocalName);
break;
case XmlNodeType.EndElement:
Console.WriteLine("End: " + reader.LocalName);
//kill = true; // I could use a particular EndElement as a signal to not try another read
break;
case XmlNodeType.Text:
Console.WriteLine("Text: " + await reader.GetValueAsync());
break;
}
if (kill) { break; }
}
}
}
}
编辑
此示例使用实际NetworkStream
,并表明如果我Close()
或Dispose()
的基础流的ReadAsync()
的调用XmlReader
不返回的希望假,而是继续阻止。
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
namespace Example
{
public class Program
{
public static void Main(string[] args)
{
NetworkStream stream = null;
var endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 50000);
var serverIsUp = new ManualResetEvent(false);
var doneWriting = new ManualResetEvent(false);
var t1 = Task.Run(() =>
{
var server = new TcpListener(endpoint);
server.Start();
serverIsUp.Set();
var client = server.AcceptTcpClient();
var writer = new StreamWriter(client.GetStream());
writer.Write("<root>"); writer.Flush();
Thread.Sleep(2000);
writer.Write("value"); writer.Flush();
Thread.Sleep(2000);
writer.Write("</root>"); writer.Flush();
Thread.Sleep(2000);
doneWriting.Set();
});
var t2 = Task.Run(() =>
{
doneWriting.WaitOne();
stream.Dispose();
Console.WriteLine("Disposed of Stream");
});
var t3 = Task.Run(async () =>
{
serverIsUp.WaitOne();
var socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
socket.Connect(endpoint);
stream = new NetworkStream(socket, true);
var reader = XmlReader.Create(stream, new XmlReaderSettings() { Async = true });
bool val;
while (val = await reader.ReadAsync())
{
bool kill = false;
switch (reader.NodeType)
{
case XmlNodeType.Element:
Console.WriteLine("Start: " + reader.LocalName);
break;
case XmlNodeType.EndElement:
Console.WriteLine("End: " + reader.LocalName);
//kill = true; // I could use a particular EndElement as a signal to not try another read
break;
case XmlNodeType.Text:
Console.WriteLine("Text: " + await reader.GetValueAsync());
break;
}
if (kill) { break; }
}
// Ideally once the underlying stream is closed, ReadAsync() would return false
// we would get here and could safely dispose the reader, but that's not the case
// ReadAsync() continues to block
reader.Dispose();
Console.WriteLine("Disposed of Reader");
});
Console.ReadLine();
}
}
}