运用 SocketAsyncEventArgs 进行异步操作
服务端三个类
第一个类: SocketAsyncEventArgsPool
internal sealed class SocketAsyncEventArgsPool
{ /// <summary> /// Pool of SocketAsyncEventArgs. /// </summary> Stack<SocketAsyncEventArgs> pool; /// <summary> /// Initializes the object pool to the specified size. /// </summary> /// <param name="capacity">Maximum number of SocketAsyncEventArgs objects the pool can hold.</param> internal SocketAsyncEventArgsPool(Int32 capacity) { this.pool = new Stack<SocketAsyncEventArgs>(capacity); } /// <summary> /// Removes a SocketAsyncEventArgs instance from the pool. /// </summary> /// <returns>SocketAsyncEventArgs removed from the pool.</returns> internal SocketAsyncEventArgs Pop() { lock (this.pool) { if (this.pool.Count > 0) { return this.pool.Pop(); } else { return null; } } } /// <summary> /// Add a SocketAsyncEventArg instance to the pool. /// </summary> /// <param name="item">SocketAsyncEventArgs instance to add to the pool.</param> internal void Push(SocketAsyncEventArgs item) { if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); } lock (this.pool) { this.pool.Push(item); } } }
第二个类: Token
delegate void ProcessData(SocketAsyncEventArgs args);
/// <summary> /// Token for use with SocketAsyncEventArgs. /// </summary> internal sealed class Token : IDisposable { private Socket connection; private StringBuilder sb; private Int32 currentIndex; /// <summary> /// Class constructor. /// </summary> /// <param name="connection">Socket to accept incoming data.</param> /// <param name="bufferSize">Buffer size for accepted data.</param> internal Token(Socket connection, Int32 bufferSize) { this.connection = connection; this.sb = new StringBuilder(bufferSize); } /// <summary> /// Accept socket. /// </summary> internal Socket Connection { get { return this.connection; } } //处理缓冲数据 internal void ProcessUserRegisterData() { sb.Length = 0; this.currentIndex = 0; }/// <summary>
/// Process data received from the client. /// </summary> /// <param name="args">SocketAsyncEventArgs used in the operation.</param> internal void ProcessData(SocketAsyncEventArgs args) { // Get the message received from the client. String received = this.sb.ToString();int handle = (int)this.Connection.Handle;
//int handle=args. //TODO Use message received to perform a specific operation. Console.WriteLine("Received From Handle[{2}] Message: {0} . The server has read {1} bytes.", received, received.Length, handle.ToString());//Byte[] sendBuffer = Encoding.UTF8.GetBytes("Return " + received);
Byte[] sendBuffer = Encoding.UTF8.GetBytes(received); args.SetBuffer(sendBuffer, args.Offset, sendBuffer.Length); // Clear StringBuffer, so it can receive more data from a keep-alive connection client. sb.Length = 0; this.currentIndex = 0; } /// <summary> /// Set data received from the client. /// </summary> /// <param name="args">SocketAsyncEventArgs used in the operation.</param> internal void SetData(SocketAsyncEventArgs args) { Int32 count = args.BytesTransferred; if ((this.currentIndex + count) > this.sb.Capacity) { //int intSplitCount = sb.Capacity - currentIndex < 0 ? 0 : sb.Capacity - currentIndex; //sb.Append(Encoding.UTF8.GetString(args.Buffer, args.Offset, intSplitCount)); //this.currentIndex = this.sb.Capacity; return; //throw new ArgumentOutOfRangeException("count", // String.Format(CultureInfo.CurrentCulture, "Adding {0} bytes on buffer which has {1} bytes, the listener buffer will overflow.", count, this.currentIndex)); } else { sb.Append(Encoding.UTF8.GetString(args.Buffer, args.Offset, count)); this.currentIndex += count; } } #region IDisposable Members /// <summary> /// Release instance. /// </summary> public void Dispose() { try { this.connection.Shutdown(SocketShutdown.Send); } catch (Exception) { // Throw if client has closed, so it is not necessary to catch. } finally { this.connection.Close(); } } #endregion }
第三个类: SocketListenerServer
using System;
using System.Collections.Generic;using System.Text;using System.Net.Sockets;using System.Net;using System.Threading;class SocketListenerServer
{ public SocketListenerServer() { }//private static Mutex mutex = new Mutex();
private static AutoResetEvent mutex = new AutoResetEvent(false);//Socket异步连接数
private int numConnections = 10;//接收发放字符字节数大小
private int bufferSize = 3072;//Socket监听对象
Socket listenSocket;//限制可同时访问同一资源对象的线程池
Semaphore semaphoreAcceptedClients;//当前服务器上的连接数
int m_numConnectedSockets;//异步操作的对象池
SocketAsyncEventArgsPool m_readWritePool;Dictionary<string, object> dirtionarySocketAsyncEventArgs = new Dictionary<string, object>();
public SocketListenerServer(int intNumConnections, int intReceiveBufferSize)
{ this.numConnections = intNumConnections; this.bufferSize = intReceiveBufferSize; //当前连接数为0 m_numConnectedSockets = 0;//实例化异步线程池
m_readWritePool = new SocketAsyncEventArgsPool(intNumConnections);semaphoreAcceptedClients = new Semaphore(intNumConnections, intNumConnections);
//初始化Server的Socket监听
for (int i = 0; i < numConnections; i++) { //实例化异步操作的对象 SocketAsyncEventArgs readWriteAsyncEvent = new SocketAsyncEventArgs(); //完成操作时调用的方法 readWriteAsyncEvent.Completed += new EventHandler<SocketAsyncEventArgs>(readWriteAsyncEvent_Completed); //设置用于异步套接字方法的数据缓冲区 readWriteAsyncEvent.SetBuffer(new Byte[this.bufferSize], 0, this.bufferSize); //增加到对象池中 this.m_readWritePool.Push(readWriteAsyncEvent); } }protected void readWriteAsyncEvent_Completed(object sender, SocketAsyncEventArgs e)
{ switch (e.LastOperation) { case SocketAsyncOperation.Receive: ProcessReceive(e); break; case SocketAsyncOperation.Send: ProcessSend(e); break; default: throw new ArgumentException("The last operation completed on the socket was not a receive or send"); } }// 启动服务器并开始侦听传入连接请求
internal void Start(Int32 port) { //获取主机相关信息。 IPAddress[] addressList = Dns.GetHostEntry(Environment.MachineName).AddressList; //获取侦听者所需的端点(endpoint)。 IPEndPoint localEndPoint = new IPEndPoint(addressList[addressList.Length - 1], port); //创建侦听传入连接的Socket。 this.listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); //设置接收发送缓冲区字节数的大小 this.listenSocket.ReceiveBufferSize = this.bufferSize; this.listenSocket.SendBufferSize = this.bufferSize; if (localEndPoint.AddressFamily == AddressFamily.InterNetworkV6) { // 设置Socket侦听者的双模式(IPv4与IPv6)。 // 27等价于IPV6_V6ONLY Socket // Winsock片段中的如下选项, // 根据 Creating IP Agnostic Applications - Part 2 (Dual Mode Sockets) // 创建IP的不可知应用——第2部分(双模式 Sockets) this.listenSocket.SetSocketOption(SocketOptionLevel.IPv6, (SocketOptionName)27, false); this.listenSocket.Bind(new IPEndPoint(IPAddress.IPv6Any, localEndPoint.Port)); } else { //Socket与本地端点关联。 this.listenSocket.Bind(localEndPoint); } //启动侦听队列最大等待数。 this.listenSocket.Listen(numConnections); //提交一个侦听Socket的接收任务。 this.StartAccept(null); mutex.WaitOne(); } //开始监听客户端的进入连接 private void StartAccept(SocketAsyncEventArgs acceptEventArg) { if (acceptEventArg == null) { acceptEventArg = new SocketAsyncEventArgs(); acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(acceptEventArg_Completed); } else { //设置监听连接的Socket为空 acceptEventArg.AcceptSocket = null; } //限制可同时访问同一资源对象的线程池,阻止等待当前线程资源 semaphoreAcceptedClients.WaitOne();//开始一个异步操作,来接收客户端的连接
if (!listenSocket.AcceptAsync(acceptEventArg)) //不是异步操作(为同步操作时) { ProcessAccept(acceptEventArg); }}
//处理Socket侦听者接收 private void ProcessAccept(SocketAsyncEventArgs acceptEventArg) {Socket s = acceptEventArg.AcceptSocket;
//判断是否连接 if (s.Connected) { try { Interlocked.Increment(ref m_numConnectedSockets);//连接数增加一个 Console.WriteLine("Client connection accepted.There are {0} clients connected to the server", m_numConnectedSockets);//获取接受的客户端连接,赋给ReadEventArg对象的UserToken。
SocketAsyncEventArgs readEventArgs = m_readWritePool.Pop(); if (readEventArgs != null) { //readEventArgs.UserToken = acceptEventArg.AcceptSocket; readEventArgs.UserToken = new Token(s, this.bufferSize);//一旦客户端连接,提交一个连接接收。
bool willRaiseEvent = acceptEventArg.AcceptSocket.ReceiveAsync(readEventArgs); if (!willRaiseEvent) { ProcessReceive(readEventArgs); } } } catch (SocketException ex) { Console.WriteLine("Error when processing data received from {0}:\r\n{1}", acceptEventArg.AcceptSocket.RemoteEndPoint, ex.ToString()); } catch (Exception ex) { Console.WriteLine(ex.ToString()); } //接受下一个连接请求。 StartAccept(acceptEventArg); }}
// 当一个异步接收操作完成时调用该方法。 // 如果远程主机关闭了连接,该Socket也关闭。 // 如果收到数据,则回返到客户端。 private void ProcessReceive(SocketAsyncEventArgs readEventArgs) { // 检查远程主机是否关闭了连接。 if (readEventArgs.BytesTransferred > 0) { if (readEventArgs.SocketError == SocketError.Success) { Token token = readEventArgs.UserToken as Token; token.SetData(readEventArgs); Socket s = token.Connection; //判断是否还有数据可供读取 if (s.Available == 0) { try { //HashTable o = new HashTable(); string strUserKey = token.GetToUserKey(); if (dirtionarySocketAsyncEventArgs.ContainsKey(strUserKey)) { s = ((dirtionarySocketAsyncEventArgs[strUserKey] as SocketAsyncEventArgs).UserToken as Token).Connection; } } catch (KeyNotFoundException ex) {}
token.ProcessData(readEventArgs); //进行异步数据发送 bool willRaiseEvent = s.SendAsync(readEventArgs); if (!willRaiseEvent) { ProcessSend(readEventArgs); } } else if (!s.ReceiveAsync(readEventArgs)) { this.ProcessReceive(readEventArgs); }}
else { Socket s = readEventArgs.UserToken as Socket; IPEndPoint localEp = s.LocalEndPoint as IPEndPoint; CloseClientSocket(readEventArgs); Console.WriteLine("Socket error {0} on endpoint {1} during {2}.", (Int32)readEventArgs.SocketError, localEp, readEventArgs.LastOperation); } } else { CloseClientSocket(readEventArgs); } }//异步操作时调用的事件方法
private void acceptEventArg_Completed(object sender, SocketAsyncEventArgs e) { ProcessAccept(e); }// 当异步发送操作完成时调用该方法。
// 当Socket读客户端的任何附加数据时,该方法启动另一个接收操作 private void ProcessSend(SocketAsyncEventArgs e) { if (e.SocketError == SocketError.Success) { //完成回发数据到客户端 Token token = e.UserToken as Token; e.SetBuffer(new byte[this.bufferSize], 0, this.bufferSize); //读取从发送客户端发送的下一个数据块。 bool willRaiseEvent = token.Connection.ReceiveAsync(e); if (!willRaiseEvent) { ProcessReceive(e); } } else { CloseClientSocket(e); } }private void CloseClientSocket(SocketAsyncEventArgs e)
{ Token token = e.UserToken as Token; token.Dispose();Interlocked.Decrement(ref m_numConnectedSockets);
semaphoreAcceptedClients.Release(); Console.WriteLine("A client has been disconnected from the server. \r\n There are {0} clients connected to the server", m_numConnectedSockets); m_readWritePool.Push(e); }//关闭服务
internal void Stop() {this.listenSocket.Close();
//mutex.ReleaseMutex();
mutex.Close(); } }