#include "stdafx.h" #include "IOCP.h" #pragma comment(lib, "ws2_32.lib") #include "..\SmsCenterDlg.h" ///////////////////////////////////////////////////////////////////////////////////////////// CIOCP::CIOCP() { m_pDlg = NULL; m_listen_socket = INVALID_SOCKET; m_listen_socket2 = INVALID_SOCKET; m_bInitSocket = false; m_bInitSocket2 = false; m_bInit = false; m_h_MainLoop = INVALID_HANDLE_VALUE; m_h_MainLoop2 = INVALID_HANDLE_VALUE; m_bQuit = false; } CIOCP::~CIOCP() { Close(); } /*------------------------------------------------------------------------------------------- 函数功能:关闭并清除资源 函数说明: 函数返回: -------------------------------------------------------------------------------------------*/ void CIOCP::Close() { try { int i; if ( !m_bInit ) return ; m_bQuit = true; if (m_h_MainLoop!=INVALID_HANDLE_VALUE ) { if (WaitForSingleObject(m_h_MainLoop,3000)== WAIT_TIMEOUT ) TerminateThread(m_h_MainLoop,0); //如果退出超时,强制结束线程 CloseHandle(m_h_MainLoop); m_h_MainLoop = INVALID_HANDLE_VALUE; } if (m_h_MainLoop2!=INVALID_HANDLE_VALUE ) { if ( WaitForSingleObject(m_h_MainLoop2,3000)== WAIT_TIMEOUT ) TerminateThread(m_h_MainLoop2,0); //如果退出超时,强制结束线程 CloseHandle(m_h_MainLoop2); m_h_MainLoop2 = INVALID_HANDLE_VALUE; } PostQueuedCompletionStatus( m_h_iocp, 0, 0, NULL ); CloseHandle( m_h_iocp ); for( i = 0; i < m_n_thread_count; i++ ) { if (m_h_thread[i]!=INVALID_HANDLE_VALUE ) { if ( WaitForSingleObject(m_h_thread[i],1000)== WAIT_TIMEOUT ) TerminateThread(m_h_thread[i],0); //如果退出超时,强制结束线程 CloseHandle( m_h_thread[i] ); m_h_thread[i] = INVALID_HANDLE_VALUE; } } Sleep(200); //等待其它的退出 m_KeyList.Close(); CloseMainSocket(); m_bInit = false; } catch (...) { LOG_APPERROR(_T("T")); } } /*------------------------------------------------------------------------------------------- 函数功能:初始化IO结点 函数说明: 函数返回: -------------------------------------------------------------------------------------------*/ void CIOCP::InitIoContext(IOCP_IO_PTR lp_io) { if ( !lp_io ) return ; try { if ( lp_io< &m_KeyList.m_IO[0] || lp_io>&m_KeyList.m_IO[MAX_LOGINUSER] ) return ; memset( &lp_io->ol, 0, sizeof( WSAOVERLAPPED ) ); //memset( &lp_io->buf, 0, BUFFER_SIZE ); lp_io->wsaBuf.buf = lp_io->buf; lp_io->wsaBuf.len = BUFFER_SIZE; lp_io->lRecvLen = 0; } catch (...) { LOG_APPERROR(_T("T")); } } /*------------------------------------------------------------------------------------------- 函数功能:初始化侦听SOCKET端口,并和完成端口连接起来。 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::InitSocket() { //端口1初始化 m_listen_socket = WSASocket( AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED ); if( INVALID_SOCKET == m_listen_socket ) { return FALSE; } //IOCP_KEY_PTR lp_key = m_key_group.GetBlank(); IOCP_KEY_PTR lp_key = m_KeyList.GetBlank_Key(); if ( !lp_key ) return FALSE; lp_key->socket = m_listen_socket; HANDLE hRet = CreateIoCompletionPort( (HANDLE)m_listen_socket, m_h_iocp, (DWORD)lp_key, 0 ); if( hRet == NULL ) { CloseOneSocket( m_listen_socket ); //m_key_group.RemoveAt( lp_key ); m_KeyList.RemoveAt_Key( lp_key ); return FALSE; } //端口2初始化 m_listen_socket2 = WSASocket( AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED ); if( INVALID_SOCKET == m_listen_socket2 ) { return FALSE; } //IOCP_KEY_PTR lp_key2 = m_key_group.GetBlank(); IOCP_KEY_PTR lp_key2 = m_KeyList.GetBlank_Key(); lp_key2->socket = m_listen_socket2; hRet = CreateIoCompletionPort( (HANDLE)m_listen_socket2, m_h_iocp, (DWORD)lp_key2, 0 ); if( hRet == NULL ) { CloseOneSocket( m_listen_socket ); //m_key_group.RemoveAt( lp_key ); m_KeyList.RemoveAt_Key( lp_key ); CloseOneSocket( m_listen_socket2 ); //m_key_group.RemoveAt( lp_key2 ); m_KeyList.RemoveAt_Key( lp_key2 ); return FALSE; } return TRUE; } /*------------------------------------------------------------------------------------------- 函数功能:关闭所有线程 函数说明: 函数返回: -------------------------------------------------------------------------------------------*/ void CIOCP::CloseThreadHandle(int count ) { if( count <= 0 ) { return; } for( int i= 0; i < count; i++ ) { if (m_h_thread[i]!=INVALID_HANDLE_VALUE ) { if ( WaitForSingleObject(m_h_thread[i],1000)== WAIT_TIMEOUT ) TerminateThread(m_h_thread[i],0); //如果退出超时,强制结束线程 CloseHandle( m_h_thread[i] ); m_h_thread[i] = INVALID_HANDLE_VALUE; } } } /*------------------------------------------------------------------------------------------- 函数功能:将侦听端口和自己的IP,PORT绑定,并开始侦听 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::BindAndListenSocket() { SOCKADDR_IN addr; memset( &addr, 0, sizeof(SOCKADDR_IN) ); addr.sin_family = AF_INET; //addr.sin_addr.s_addr = inet_addr( ADDR ); //addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_addr.s_addr = m_pDlg->m_Setup.addr; #ifdef UNICODE addr.sin_port = htons( DEFAULT_SERVERPORT_UNI ); #else addr.sin_port = htons( DEFAULT_LISTENPORT ); #endif int nRet; nRet = bind( m_listen_socket, (SOCKADDR*)&addr, sizeof( SOCKADDR ) ); if( SOCKET_ERROR == nRet || SOCKET_ERROR == nRet) { //cout<<"bind fail!"<<endl; int er = WSAGetLastError(); return FALSE; } nRet = listen( m_listen_socket, 50 ); if( SOCKET_ERROR == nRet ) { //cout<<"listen fail!"<<endl; return FALSE; } m_bInitSocket = true; return TRUE; } /*------------------------------------------------------------------------------------------- 函数功能:根据CPU的数目,启动相应数量的数据处理线程 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::StartThread() { int i; SYSTEM_INFO sys_info; GetSystemInfo( &sys_info ); m_n_thread_count = sys_info.dwNumberOfProcessors*2; if (m_n_thread_count<2 || m_n_thread_count>MAXTHREAD_COUNT ) m_n_thread_count = MAXTHREAD_COUNT; //m_n_thread_count = 4; for( i = 0; i < m_n_thread_count; i++ ) { m_h_thread[i] = CreateThread( NULL, 0, CompletionRoutine, (LPVOID)this, 0, NULL ); if( INVALID_HANDLE_VALUE == m_h_thread[i] ) { CloseThreadHandle( i ); CloseHandle( m_h_iocp ); return FALSE; } } return TRUE; } /*------------------------------------------------------------------------------------------- 函数功能:发出一定数量的连接 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::PostAcceptEx() { #ifdef _DEBUG int count = 10; #else int count = 30; #endif DWORD dwBytes; BOOL bRet; for( int i = 0; i < count; i++ ) { SOCKET socket = WSASocket( AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED ); if( INVALID_SOCKET == socket ) { //cout<<"post accept ex fail!"<<WSAGetLastError()<<endl; continue; } //设置Socket u_long ulTemp=1; //将SOCKET设成非阻塞式的SOCKET //u_long ulTemp=0; //将SOCKET设成阻塞式的SOCKET //ioctlsocket( socket,FIONBIO,&ulTemp ); ulTemp=128*1024; setsockopt( socket , SOL_SOCKET, SO_SNDBUF ,(const char *)&ulTemp ,sizeof(ulTemp)); //设绶冲 ulTemp=64*1024; setsockopt( socket , SOL_SOCKET, SO_RCVBUF ,(const char *)&ulTemp ,sizeof(ulTemp)); //设绶冲 /* setsockopt( socket, SOL_SOCKET, SO_DONTLINGER,(const char *)&ulTemp,sizeof(BOOL)); struct linger { u_short l_onoff; u_short l_linger; }; linger m_sLinger; m_sLinger.l_onoff=0;//(在closesocket()调用,但是还有数据没发送完毕的时候容许逗留) // 如果m_sLinger.l_onoff=0;则功能和2.)作用相同; m_sLinger.l_linger=0;//(容许逗留的时间为5秒) int iLen=sizeof(m_sLinger); int i = setsockopt(socket,SOL_SOCKET,SO_LINGER,(const char*)&m_sLinger,sizeof(linger)); */ // int lLen=sizeof(ulTemp); // getsockopt( socket , SOL_SOCKET, SO_SNDBUF ,(char *)&ulTemp ,&lLen); //设绶冲 IOCP_IO_PTR lp_io = m_KeyList.GetBlank_IO(); if ( lp_io != NULL ) { InitIoContext( lp_io ); lp_io->socket = socket; lp_io->operation = IOCP_ACCEPT; lp_io->state = SOCKET_STATE_NOT_CONNECT; ///////////////////////////////////////////////// bRet = lpAcceptEx( m_listen_socket, lp_io->socket, lp_io->buf, // lp_io->wsaBuf.len - 2 * ( sizeof(SOCKADDR_IN) + 16 ), 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &dwBytes,&lp_io->ol); if( ( bRet == FALSE ) && ( WSA_IO_PENDING != WSAGetLastError() ) ) { closesocket( socket ); //m_io_group.RemoveAt( lp_io ); m_KeyList.RemoveAt_IO( lp_io ); //cout<<"post acceptex fail:"<<WSAGetLastError()<<endl; continue; } } else { int i=10; } } return TRUE; } /*------------------------------------------------------------------------------------------- 函数功能:处理数据函数 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::HandleData(IOCP_IO_PTR lp_io,IOCP_KEY_PTR lp_key, int nFlags) { switch( nFlags ) { case IOCP_COMPLETE_ACCEPT: { InitIoContext( lp_io ); lp_io->operation = IOCP_READ; } break; case IOCP_COMPLETE_ACCEPT_READ: { /* lp_io->operation = IOCP_WRITE; GetAddrAndPort( lp_io->wsaBuf.buf, szAddress, uPort ); //MSG(lp_io->wsaBuf.len); memset( &lp_io->ol, 0, sizeof(lp_io->ol) ); */ } break; case IOCP_COMPLETE_READ: { //cout<<"read a data!*******************************"<<endl; lp_io->lRecvLen += lp_io->wsaBuf.len; long lRet=0; if ( lp_io->m_lLocalPort == DEFAULT_LISTENPORT2 ) lRet = GetFrame_Cmpp(lp_io); else lRet = GetFrame(lp_io); if ( lRet < 0 ) { //处理帧失败 m_KeyList.RemoveAt_IO( lp_io ); m_KeyList.RemoveAt_Key( lp_key ); closesocket( lp_io->socket ); lp_io->operation = IOCP_END; //关闭后不用处理了 } else { //接收到的数据过大,并且没有合适的包。 if ( lp_io->lRecvLen>=BUFFER_SIZE ) { m_KeyList.RemoveAt_IO( lp_io ); m_KeyList.RemoveAt_Key( lp_key ); closesocket( lp_io->socket ); lp_io->operation = IOCP_END; //关闭后不用处理了 } else { //重新开始投射io lp_io->operation = IOCP_READ; memset( &lp_io->ol, 0, sizeof(lp_io->ol) ); lp_io->wsaBuf.buf = lp_io->buf + lp_io->lRecvLen; lp_io->wsaBuf.len = BUFFER_SIZE-lp_io->lRecvLen; } } } break; case IOCP_COMPLETE_WRITE: { try { IOCP_IO_SEND_PTR ioS = (IOCP_IO_SEND_PTR)lp_io; if ( ioS->wsaBuf.len >0 ) { if ( ioS->wsaBuf.buf ) delete ioS->wsaBuf.buf; ioS->wsaBuf.buf = NULL; } delete ioS; } catch (...) { LOG_APPERROR(_T("T")); break; } } break; default: { //cout<<"handleData do nothing!*********************"<<endl; return FALSE; } } return TRUE; } /*------------------------------------------------------------------------------------------- 函数功能:发出一些重叠动作 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::DataAction(IOCP_IO_PTR lp_io, IOCP_KEY_PTR lp_key ) { DWORD dwBytes; int nRet; DWORD dwFlags; switch( lp_io->operation ) { case IOCP_WRITE: { nRet = WSASend( lp_io->socket, &lp_io->wsaBuf, 1, &dwBytes, 0, &lp_io->ol,NULL); if( ( nRet == SOCKET_ERROR ) && ( WSAGetLastError() != WSA_IO_PENDING ) ) { m_KeyList.RemoveAt_IO( lp_io ); m_KeyList.RemoveAt_Key( lp_key ); closesocket( lp_io->socket ); return FALSE; } } break; case IOCP_WRITE2: { /* IOCP_IO_SEND_PTR ioS = (IOCP_IO_SEND_PTR)lp_io; ioS->operation = IOCP_WRITE; nRet = WSASend( lp_key->socket, &ioS->wsaBuf[ioS->lSendNowIndex-1], 1, &dwBytes, 0, &ioS->ol,NULL); /* if( ( nRet == SOCKET_ERROR ) && ( WSAGetLastError() != WSA_IO_PENDING ) ) { //cout<<"WSASend fail!----------------------------------------"<<WSAGetLastError()<<endl; closesocket( lp_io->socket ); //m_io_group.RemoveAt( lp_io ); m_KeyList.RemoveAt_IO( lp_io ); //m_key_group.RemoveAt( lp_key ); m_KeyList.RemoveAt_Key( lp_key ); return FALSE; } */ } break; case IOCP_READ: { //cout<<"post a read data!-----------------------------------------"<<endl; dwFlags = 0; nRet = WSARecv( lp_io->socket, &lp_io->wsaBuf, 1, &dwBytes, &dwFlags, &lp_io->ol,NULL); if( ( nRet == SOCKET_ERROR ) && ( WSAGetLastError() != WSA_IO_PENDING ) ) { m_KeyList.RemoveAt_IO( lp_io ); m_KeyList.RemoveAt_Key( lp_key ); closesocket( lp_io->socket ); return FALSE; } } break; case IOCP_END: break; default: { //cout<<"DataAction do nothing!------------------------------------------"<<endl; return FALSE; } } return TRUE; } /*------------------------------------------------------------------------------------------- 函数功能:得到MS封装的SOCKET函数指针,这样可以提高速度 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::GetFunPointer() { DWORD dwRet,nRet; nRet = WSAIoctl( m_listen_socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &g_GUIDAcceptEx, sizeof(g_GUIDAcceptEx), &lpAcceptEx, sizeof(lpAcceptEx), &dwRet,NULL,NULL); if( SOCKET_ERROR == nRet ) { CloseOneSocket(m_listen_socket); //cout<<"get acceptex fail!"<<WSAGetLastError()<<endl; return FALSE; } nRet = WSAIoctl( m_listen_socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &g_GUIDTransmitFile, sizeof(g_GUIDTransmitFile), &lpTransmitFile, sizeof(lpTransmitFile), &dwRet,NULL,NULL); if(nRet == SOCKET_ERROR ) { CloseOneSocket(m_listen_socket); //cout<<"get transmitfile fail!"<<WSAGetLastError()<<endl; return FALSE; } nRet = WSAIoctl( m_listen_socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &g_GUIDAcceptExaddrs, sizeof(g_GUIDAcceptExaddrs), &lpGetAcceptExSockAddrs, sizeof(lpGetAcceptExSockAddrs), &dwRet,NULL,NULL); if(nRet == SOCKET_ERROR ) { CloseOneSocket(m_listen_socket); //cout<<"get transmitfile fail!"<<WSAGetLastError()<<endl; return FALSE; } return TRUE; } /*------------------------------------------------------------------------------------------- 函数功能:看看是否有连接了,但很长时间没有数据的“无效连接”,有的话,就踢掉 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ void CIOCP::CheckForInvalidConnection() { try { TCHAR szMsg[256]={0}; DWORD dwNow = ::GetTickCount(); IOCP_IO_PTR lp_io = m_KeyList.m_IO; if ( !lp_io ) return ; for ( int i=0 ; i< MAX_LOGINUSER ; i++ ) { if ( lp_io[i].socket != NULL && lp_io[i].socket!=INVALID_SOCKET && lp_io[i].lRandID>0 ) { if (lp_io[i].dwAcceptTime>0 && lp_io[i].state != SOCKET_STATE_CONNECT_AND_READ ) { /* op_len = sizeof(op); nRet = getsockopt( lp_io[i].socket, SOL_SOCKET, SO_CONNECT_TIME, (char*)&op, &op_len ); if( SOCKET_ERROR == nRet ) { continue; } if( op != 0xffffffff && op > 3 ) //3秒连接超时 { sprintf(szMsg , "[%d]%s:%d 连接超时!" , lp_io[i].m_lLocalPort,lp_io[i].m_szIP,lp_io[i].m_lPort); m_pDlg->AddLog(szMsg); closesocket( lp_io[i].socket ); } */ if ( dwNow - lp_io[i].dwAcceptTime >5000 ) //10秒连接超时 { _stprintf(szMsg , _T("[%d]%s:%d 连接超时!") , lp_io[i].m_lLocalPort,lp_io[i].m_szIP,lp_io[i].m_lPort); m_pDlg->AddLog(szMsg); closesocket( lp_io[i].socket ); } } if ( lp_io[i].state == SOCKET_STATE_CONNECT_AND_READ ) { if ( lp_io[i].dwEndRecvFrame>0 ) { if ( dwNow - lp_io[i].dwEndRecvFrame > RECVREFRESH_TIMEOUT ) { lp_io[i].m_lID = 199992; //断线退出的标志 closesocket( lp_io[i].socket ); } } else { if ( dwNow - lp_io[i].dwAcceptTime > 40000 ) //30秒没收到正确的包就超时 { _stprintf(szMsg , _T("[%d]%s:%d 接收数据超时!") , lp_io[i].m_lLocalPort,lp_io[i].m_szIP,lp_io[i].m_lPort); m_pDlg->AddLog(szMsg); closesocket( lp_io[i].socket ); } } } } } } catch (...) { LOG_APPERROR(_T("T")); } } /*------------------------------------------------------------------------------------------- 函数功能:注册FD_ACCEPTG事件到m_h_accept_event事件,以便所有发出去的连接耗耗尽时,得到通知。 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::RegAcceptEvent() { int nRet; m_h_accept_event = CreateEvent( NULL, FALSE, FALSE, NULL ); if( NULL == m_h_accept_event ) { return FALSE; } nRet = WSAEventSelect( m_listen_socket, m_h_accept_event, FD_ACCEPT ); if( nRet != 0 ) { CloseHandle( m_h_accept_event ); return FALSE; } return TRUE; } /*------------------------------------------------------------------------------------------- 函数功能:得到连接上来的客户端IP和PORT 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::GetAddrAndPort(char*buf,char ip[],UINT &port) { int len = BUFFER_SIZE - sizeof( SOCKADDR_IN ) - 16; char *lp_buf = buf + len; //直接读取远端地址 SOCKADDR_IN addr; memcpy( &addr, lp_buf, sizeof( SOCKADDR_IN ) ); port = ntohl( addr.sin_port ); strcpy( ip, inet_ntoa( addr.sin_addr ) ); return TRUE; } /////////////////////////////////////////////////////////////////////////////////////////////////////////// /*------------------------------------------------------------------------------------------- 函数功能:初始化完成端口及相关的所有东西,并发出每一个10个连接. 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::Init() { BOOL b1; BOOL b2; WSAData data; if( WSAStartup( MAKEWORD(2,2),&data) != 0 ) { //cout<<"WSAStartup fail!"<<WSAGetLastError() << endl; return FALSE; } m_KeyList.Init(m_pDlg); m_h_iocp = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, NULL, 0 ); if( NULL == m_h_iocp ) { //cout << "CreateIoCompletionPort() failed: " << GetLastError() << endl; return FALSE; } if( !StartThread() ) { //cout<<"start thread fail!"<<endl; Close(); //PostQueuedCompletionStatus( m_h_iocp, 0, NULL, NULL ); //CloseHandle( m_h_iocp ); return FALSE; } if( !InitSocket() ) { Close(); //PostQueuedCompletionStatus( m_h_iocp, 0, NULL, NULL ); //CloseHandle( m_h_iocp ); return FALSE; } if ( !BindAndListenSocket() ) //绑定端口1 { Close(); return FALSE; } m_bInitSocket2=false; //不启用7890端口 /* #ifndef UNICODE if ( !BindAndListenSocket2()) //绑定端口2 { Close(); return FALSE; } #endif */ if( !m_bInitSocket && !m_bInitSocket2 ) //两个端口初始化都不成功 { Close(); //PostQueuedCompletionStatus( m_h_iocp, 0, NULL, NULL ); //CloseHandle( m_h_iocp ); //CloseMainSocket(); return FALSE; } if ( !GetFunPointer() ) { Close(); //PostQueuedCompletionStatus( m_h_iocp, 0, NULL, NULL ); //CloseHandle( m_h_iocp ); //CloseMainSocket(); return FALSE; } b1 = true; if (m_bInitSocket) b1=PostAcceptEx(); b2 = true; if (m_bInitSocket2) b2=PostAcceptEx2(); if ( !b1 && !b2 ) { Close(); //PostQueuedCompletionStatus( m_h_iocp, 0, NULL, NULL ); //CloseHandle( m_h_iocp ); //CloseMainSocket(); return FALSE; } b1 = true; if (m_bInitSocket) b1=RegAcceptEvent(); b2 = true; if (m_bInitSocket2) b1=RegAcceptEvent2(); if ( !b1 && !b2 ) { Close(); //PostQueuedCompletionStatus( m_h_iocp, 0, NULL, NULL ); //CloseHandle( m_h_iocp ); //CloseMainSocket(); return FALSE; } m_bInit = true; m_bQuit = false; //主线程不允许退出 m_h_MainLoop = CreateThread( NULL, 0, MainLoop, (LPVOID)this, 0, NULL ); if ( m_h_MainLoop == INVALID_HANDLE_VALUE ) //主线程创建不成功 { this->Close(); return false; } //if( m_bInitSocket2 ) //{ m_h_MainLoop2 = CreateThread( NULL, 0, MainLoop2, (LPVOID)this, 0, NULL ); if ( m_h_MainLoop2 == INVALID_HANDLE_VALUE ) //主线程创建不成功 { this->Close(); return false; } //} return TRUE; } /*------------------------------------------------------------------------------------------- 函数功能:主循环 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ DWORD CIOCP::MainLoop(LPVOID lp_param) { try { AutoCoInitializeEx AutoCoInit; //自动初始化与结束Com环境 CIOCP * piocp = (CIOCP *)lp_param; DWORD dwRet; while( !piocp->m_bQuit ) { dwRet = WaitForSingleObject( piocp->m_h_accept_event, 300 ); switch( dwRet ) { case WAIT_FAILED: { CString str; str.Format( _T("Accept_Event Error:%d") , GetLastError()); piocp->m_pDlg->AddLog(str); piocp->m_pDlg->ON_TD_StopAll2(); //可能是资源不够造成失败,停止所有通道 piocp->PostAcceptEx(); //AfxMessageBox( _T("accept_event Error!") ); //PostQueuedCompletionStatus( m_h_iocp, 0, 0, NULL ); //return FALSE; } break; case WAIT_TIMEOUT: { //CheckForInvalidConnection(); } break; case WAIT_OBJECT_0: //接收到了所有发出的连接都用光了的消息,再次发出连接 { piocp->PostAcceptEx(); } break; } /* #ifndef UNICODE dwRet = WaitForSingleObject( piocp->m_h_accept_event2, 300 ); switch( dwRet ) { case WAIT_FAILED: { AfxMessageBox( _T("accept_event2 Error!") ); //PostQueuedCompletionStatus( m_h_iocp, 0, 0, NULL ); //return FALSE; } break; case WAIT_TIMEOUT: { //CheckForInvalidConnection(); } break; case WAIT_OBJECT_0: //接收到了所有发出的连接都用光了的消息,再次发出连接 { piocp->PostAcceptEx2(); } break; } #endif */ /* nCount ++; if ( nCount>5 ) //每5秒检测一次超时 { nCount=0; //每个event等待1秒,加起来就是2秒,可以检测连接没有数据的socket piocp->CheckForInvalidConnection(); } */ } return TRUE; } catch(...) { LOG_APPERROR(_T("T")); return false; } } /*------------------------------------------------------------------------------------------- 函数功能:数据处理线程函数 函数说明: 函数返回: -------------------------------------------------------------------------------------------*/ DWORD CIOCP::CompletionRoutine(LPVOID lp_param) { USES_CONVERSION; try { AutoCoInitializeEx AutoCoInit; //自动初始化与结束Com环境 CIOCP* lp_this = (CIOCP*)lp_param; int nRet; BOOL bRet; DWORD dwBytes = 0; HANDLE hRet; IOCP_KEY_PTR lp_key = NULL; IOCP_IO_PTR lp_io = NULL; LPWSAOVERLAPPED lp_ov = NULL; IOCP_KEY_PTR lp_new_key = NULL; while( TRUE ) { bRet = GetQueuedCompletionStatus( lp_this->m_h_iocp, &dwBytes, (PULONG_PTR)&lp_key, &lp_ov, INFINITE ); if ( lp_this->m_bQuit ) //收到退出信号 return 0; lp_io = (IOCP_IO_PTR)lp_ov; if ( lp_io && !AfxIsValidAddress(&lp_io->operation,4,true) ) //收到的lp_io有问题 continue; if ( (bRet==0 || dwBytes ==0 ) && lp_io && lp_io->operation== IOCP_WRITE ) //删除不成功的Write { lp_this->HandleData( lp_io, lp_key, IOCP_COMPLETE_WRITE ); continue; } if( bRet == 0 ) { if ( lp_key && lp_io && lp_io->lRandID>0 ) { lp_this->m_KeyList.RemoveAt_IO( lp_io ); lp_this->m_KeyList.RemoveAt_Key( lp_key ); continue; } if ( (!lp_key && !lp_io) ) { continue; //return 0; //已退出 } //cout << "GetQueuedCompletionStatus() failed: " << GetLastError() << endl; //continue; } if( NULL == lp_key ) { continue; //return 0; } if( lp_io == NULL ) { //cout<<"recv a null CIoContext!"<<endl; continue; } if ( lp_io&& lp_io->operation!=IOCP_WRITE && (lp_io< &lp_this->m_KeyList.m_IO[0] || lp_io>&lp_this->m_KeyList.m_IO[MAX_LOGINUSER]) ) continue; if ( lp_key && (lp_key< &lp_this->m_KeyList.m_Key[0] || lp_key>&lp_this->m_KeyList.m_Key[MAX_LOGINUSER]) ) continue; if( ( IOCP_ACCEPT != lp_io->operation ) && ( 0 == dwBytes ) ) { closesocket( lp_io->socket ); //lp_this->m_io_group.RemoveAt( lp_io ); //lp_this->m_key_group.RemoveAt( lp_key ); lp_this->m_KeyList.RemoveAt_IO( lp_io ); lp_this->m_KeyList.RemoveAt_Key( lp_key ); //MSG("一个用户退出了"); continue; } switch( lp_io->operation ) { case IOCP_ACCEPT: { //cout<<"接收到一个连接"<<endl; lp_io->state = SOCKET_STATE_CONNECT; if( dwBytes > 0 ) { lp_io->state = SOCKET_STATE_CONNECT_AND_READ; //cout<<"读取到一条数据"<<endl; } //取IP地址与端口信息 SOCKADDR *local=NULL; SOCKADDR *remote=NULL; int local_len; int remote_len; lp_this->lpGetAcceptExSockAddrs(lp_io->buf,0,sizeof(SOCKADDR_IN) + 16,sizeof(SOCKADDR_IN) + 16,&local,&local_len,&remote,&remote_len); SOCKADDR_IN * pAA = ((SOCKADDR_IN *)local); SOCKADDR_IN * pBB = ((SOCKADDR_IN *)remote); #ifdef UNICODE _tcscpy(lp_io->m_szIP,A2W(inet_ntoa(pBB->sin_addr))); #else strcpy(lp_io->m_szIP,inet_ntoa(pBB->sin_addr)); #endif lp_io->m_lPort = ntohs(pBB->sin_port); lp_io->m_lLocalPort = ntohs(pAA->sin_port); #ifdef UNICODE if (lp_io->m_lLocalPort == DEFAULT_SERVERPORT_UNI ) #else if (lp_io->m_lLocalPort == DEFAULT_LISTENPORT ) #endif { nRet = setsockopt( lp_io->socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, ( char* )&lp_this->m_listen_socket, sizeof( lp_this->m_listen_socket ) ); } else { nRet = setsockopt( lp_io->socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, ( char* )&lp_this->m_listen_socket2, sizeof( lp_this->m_listen_socket2 ) ); } if( SOCKET_ERROR == nRet ) { closesocket( lp_io->socket ); //lp_this->m_io_group.RemoveAt( lp_io ); lp_this->m_KeyList.RemoveAt_IO( lp_io ); //cout<<"update socket fail!"<<WSAGetLastError()<<endl; continue; } //lp_new_key = lp_this->m_key_group.GetBlank(); lp_new_key = lp_this->m_KeyList.GetBlank_Key(); if( lp_new_key == NULL ) { closesocket( lp_io->socket ); continue; } lp_new_key->socket = lp_io->socket; lp_io->pKey = lp_new_key; //记录对应的key参数 lp_io->dwAcceptTime = ::GetTickCount(); //Accept的时间 //将新建立的SOCKET同完成端口关联起来。 hRet = CreateIoCompletionPort( ( HANDLE )lp_io->socket, lp_this->m_h_iocp, (DWORD)lp_new_key,0 ); if( NULL == hRet ) { lp_this->m_KeyList.RemoveAt_Key( lp_new_key ); lp_this->m_KeyList.RemoveAt_IO( lp_io ); closesocket( lp_io->socket ); continue; } //处理数据 lp_this->HandleData( lp_io, lp_new_key,IOCP_COMPLETE_ACCEPT ); bRet = lp_this->DataAction( lp_io, lp_new_key ); if( FALSE == bRet ) { continue; } } break; case IOCP_READ: { if( SOCKET_STATE_CONNECT_AND_READ != lp_io->state ) { lp_io->state = SOCKET_STATE_CONNECT_AND_READ; } lp_io->wsaBuf.len = dwBytes; lp_this->HandleData( lp_io, lp_key, IOCP_COMPLETE_READ ); bRet = lp_this->DataAction( lp_io, lp_key ); if( FALSE == bRet ) { continue; } } break; case IOCP_WRITE: { lp_this->HandleData( lp_io, lp_key, IOCP_COMPLETE_WRITE ); if( FALSE == bRet ) { continue; } } break; default: { continue; } break; } } return 0; } catch (... ) { LOG_APPERROR(_T("T")); return 1; } } void CIOCP::CloseMainSocket() { CloseOneSocket(m_listen_socket); CloseOneSocket(m_listen_socket2); } BOOL CIOCP::BindAndListenSocket2() { SOCKADDR_IN addr; memset( &addr, 0, sizeof(SOCKADDR_IN) ); addr.sin_family = AF_INET; //addr.sin_addr.s_addr = inet_addr( ADDR ); //addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_addr.s_addr = m_pDlg->m_Setup.addr; addr.sin_port = htons( DEFAULT_LISTENPORT2 ); int nRet; nRet = bind( m_listen_socket2, (SOCKADDR*)&addr, sizeof( SOCKADDR ) ); if( SOCKET_ERROR == nRet ) { //cout<<"bind fail!"<<endl; return FALSE; } nRet = listen( m_listen_socket2, 20 ); if( SOCKET_ERROR == nRet ) { //cout<<"listen fail!"<<endl; return FALSE; } m_bInitSocket2 = true; return TRUE; } void CIOCP::CloseOneSocket(SOCKET &socket) { if ( socket != INVALID_SOCKET ) { closesocket(socket); socket = INVALID_SOCKET; } } BOOL CIOCP::PostAcceptEx2() { #ifdef _DEBUG int count = 10; #else int count = 30; #endif DWORD dwBytes; BOOL bRet; for( int i = 0; i < count; i++ ) { SOCKET socket = WSASocket( AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED ); if( INVALID_SOCKET == socket ) { //cout<<"post accept ex fail!"<<WSAGetLastError()<<endl; continue; } /* //设置Socket u_long ulTemp=1; //将SOCKET设成非阻塞式的SOCKET //u_long ulTemp=0; //将SOCKET设成阻塞式的SOCKET ioctlsocket( socket,FIONBIO,&ulTemp ); ulTemp=64000; setsockopt( socket , SOL_SOCKET, SO_SNDBUF ,(const char *)&ulTemp ,sizeof(ulTemp)); //设绶冲 ulTemp=64000; setsockopt( socket , SOL_SOCKET, SO_RCVBUF ,(const char *)&ulTemp ,sizeof(ulTemp)); //设绶冲 setsockopt( socket, SOL_SOCKET, SO_DONTLINGER,(const char *)&ulTemp,sizeof(BOOL)); struct linger { u_short l_onoff; u_short l_linger; }; linger m_sLinger; m_sLinger.l_onoff=0;//(在closesocket()调用,但是还有数据没发送完毕的时候容许逗留) // 如果m_sLinger.l_onoff=0;则功能和2.)作用相同; m_sLinger.l_linger=0;//(容许逗留的时间为5秒) int iLen=sizeof(m_sLinger); int i = setsockopt(socket,SOL_SOCKET,SO_LINGER,(const char*)&m_sLinger,sizeof(linger)); */ //IOCP_IO_PTR lp_io = m_io_group.GetBlank(); IOCP_IO_PTR lp_io = m_KeyList.GetBlank_IO(); if ( lp_io != NULL ) { InitIoContext( lp_io ); lp_io->socket = socket; lp_io->operation = IOCP_ACCEPT; lp_io->state = SOCKET_STATE_NOT_CONNECT; ///////////////////////////////////////////////// bRet = lpAcceptEx( m_listen_socket2, lp_io->socket, lp_io->buf, // lp_io->wsaBuf.len - 2 * ( sizeof(SOCKADDR_IN) + 16 ), 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &dwBytes,&lp_io->ol); if( ( bRet == FALSE ) && ( WSA_IO_PENDING != WSAGetLastError() ) ) { closesocket( socket ); //m_io_group.RemoveAt( lp_io ); m_KeyList.RemoveAt_IO( lp_io ); //cout<<"post acceptex fail:"<<WSAGetLastError()<<endl; continue; } } else { int i=10; } } return TRUE; } BOOL CIOCP::RegAcceptEvent2() { int nRet; m_h_accept_event2 = CreateEvent( NULL, FALSE, FALSE, NULL ); if( NULL == m_h_accept_event2 ) { return FALSE; } nRet = WSAEventSelect( m_listen_socket2, m_h_accept_event2, FD_ACCEPT ); if( nRet != 0 ) { CloseHandle( m_h_accept_event2 ); return FALSE; } return TRUE; } long CIOCP::GetFrame(IOCP_IO_PTR lp_io) { try { int iGetFrame=0; for ( int i=0 ; i<lp_io->lRecvLen ; i++ ) { if ( i+sizeof(Socket_Head) > (DWORD)lp_io->lRecvLen ) { //i--; break; } Socket_Head SHead={0}; memcpy(&SHead,lp_io->buf+i,sizeof(Socket_Head)); i=i+sizeof(Socket_Head); long lGetLen=SHead.lFrameLen; if ( SHead.bCompress ) lGetLen = SHead.lCompressLen; if ( SHead.lFuncType <=0 || SHead.lFuncType > 100000 || SHead.lCompressLen<0 || SHead.lCompressLen>BUFFER_SIZE || SHead.lFrameLen<0 || SHead.lFrameLen>BUFFER_SIZE ) //简单判断收到的帧是否有问题 { return -1; //数据不对,关闭连接 } if ( i+lGetLen>lp_io->lRecvLen) //数据不够 { i=i-sizeof(Socket_Head); //i--; break; } lp_io->dwEndRecvFrame = ::GetTickCount(); if ( SHead.lFuncType == SMSFUNC_TEST && lp_io->m_lUserID>0 ) //直接回复SMSFunc_Test { ANS_Test * pTestRet = new ANS_Test; pTestRet->lTemp = ::GetTickCount(); CSmsCenterDlg::SendFrame(lp_io, SMSFUNC_TEST , (BYTE*)pTestRet , sizeof(ANS_Test) ); //发送返回 } else { IOCP_SQL_PTR pSql = m_KeyList.GetBlank_SQL(); if ( pSql ) { try { //CProcessSocket * pProcess = m_KeyList.GetBlank_Process(); CProcessSocket * pProcess = new CProcessSocket; if ( pProcess ) { BOOL bOK = true; pProcess->SetDlg(m_pDlg); pProcess->IOCP_setSQL(pSql); Cmpp_Head head={0}; bOK = pProcess->IOCP_GetFrameData(lp_io , SHead , head,(BYTE*)(lp_io->buf+i) , lGetLen); if ( bOK ) //启动线程处理 { iGetFrame ++; QueueUserWorkItem(CProcessSocket::IOCP_Process, (PVOID)pProcess, WT_EXECUTELONGFUNCTION); } else { //释放资源 pProcess->m_bUse = false; pProcess->m_lRandID=0; pSql->bUse = false; pSql->lRandID = 0; //暂时不删除此对像,防止出错 delete pProcess; //删除new的对像 } } else { AfxMessageBox(_T("new Process error")); } } catch(...) { LOG_APPERROR(_T("T")); pSql->bUse = false; //有异常,恢复正常模式 pSql->lRandID = 0; } } } i=i+lGetLen; i=i-1; } if ( i >= lp_io->lRecvLen ) //数据刚刚好 { lp_io->lRecvLen = 0; } else { if ( i> 0 ) { //转移数据 lp_io->lRecvLen = lp_io->lRecvLen-i; memcpy(lp_io->buf , lp_io->buf+i , lp_io->lRecvLen ); } } return iGetFrame; } catch (...) { LOG_APPERROR(_T("T")); } return -1; } long CIOCP::GetFrame_Cmpp(IOCP_IO_PTR lp_io) { try { int iGetFrame=0; for ( int i=0 ; i<lp_io->lRecvLen ; i++ ) { if ( i+sizeof(Cmpp_Head) > (DWORD)lp_io->lRecvLen ) { //i--; break; } Cmpp_Head SHead={0}; memcpy(&SHead,lp_io->buf+i,sizeof(Cmpp_Head)); i=i+sizeof(Cmpp_Head); SHead.Command_Id = ntohl(SHead.Command_Id); SHead.Sequence_Id= ntohl(SHead.Sequence_Id); SHead.Total_Length = ntohl(SHead.Total_Length); long lGetLen=SHead.Total_Length-sizeof(Cmpp_Head); if ( SHead.Total_Length <=0 || SHead.Total_Length>BUFFER_SIZE ) //简单判断收到的帧是否有问题 { return -1; //数据不对,关闭连接 } if ( i+lGetLen>lp_io->lRecvLen) //数据不够 { i=i-sizeof(Cmpp_Head); //i--; break; } lp_io->dwEndRecvFrame = ::GetTickCount(); /* if ( SHead.Command_Id == CMPP_ACTIVE_TEST && lp_io->m_lUserID>0 ) //直接回复SMSFunc_Test { Cmpp_Active_Test_Resp * pActive = new Cmpp_Active_Test_Resp; memset(pActive , 0 , sizeof(Cmpp_Active_Test_Resp)); CSmsCenterDlg::SendFrame_Cmpp(lp_io, CMPP_ACTIVE_TEST_RESP ,SHead.Sequence_Id, (BYTE*)pActive , sizeof(Cmpp_Active_Test_Resp) ); //发送返回 } else { IOCP_SQL_PTR pSql = m_KeyList.GetBlank_SQL(); if ( pSql ) { //CProcessSocket * pProcess = m_KeyList.GetBlank_Process(); CProcessSocket * pProcess = new CProcessSocket; if ( pProcess ) { BOOL bOK = true; pProcess->SetDlg(m_pDlg); pProcess->IOCP_setSQL(pSql); Socket_Head head={0}; bOK = pProcess->IOCP_GetFrameData(lp_io , head,SHead , (BYTE*)(lp_io->buf+i) , lGetLen); if ( bOK ) //启动线程处理 { iGetFrame ++; QueueUserWorkItem(CProcessSocket::IOCP_Process, (PVOID)pProcess, WT_EXECUTELONGFUNCTION); } else { //释放资源 pProcess->m_bUse = false; pProcess->m_lRandID=0; pSql->bUse = false; pSql->lRandID = 0; //暂时不删除此对像,防止出错 delete pProcess; //删除new的对像 } } } } */ i=i+lGetLen; i=i-1; } if ( i >= lp_io->lRecvLen ) //数据刚刚好 { lp_io->lRecvLen = 0; } else { if ( i> 0 ) { //转移数据 lp_io->lRecvLen = lp_io->lRecvLen-i; memcpy(lp_io->buf , lp_io->buf+i , lp_io->lRecvLen ); } } return iGetFrame; } catch (...) { LOG_APPERROR(_T("T")); } return -1; } void CIOCP::SetDlg(CSmsCenterDlg *pDlg) { m_pDlg = pDlg; } void CIOCP::InitIoContext_Send(IOCP_IO_SEND_PTR lp_io) { memset( &lp_io->ol, 0, sizeof( WSAOVERLAPPED ) ); } void CIOCP::CheckForInvalidSQLConnection() { try { TCHAR szMsg[256]={0}; DWORD dwNow = ::GetTickCount(); /* LPVOID * lpProcess = m_KeyList.m_Process; if ( lpProcess ) { for ( int i=0 ; i<MAX_SQLCONNECT; i++ ) { CProcessSocket * pSocket = (CProcessSocket *)lpProcess[i]; if ( pSocket && pSocket->m_adoConnection.IsOpen() ) { if ( pSocket->m_bUse && pSocket->m_lRandID>0 ) //在使用中的SQL,40分钟超时 { if ( dwNow - pSocket->m_dwEndUse > SQL_TIMEOUT_USE ) { pSocket->m_AdoRS.Close(); pSocket->m_AdoRS2.Close(); pSocket->m_adoConnection.Close(); pSocket->m_bUse = false; pSocket->m_lRandID = 0; sprintf(szMsg, "SQL:%d 操作超时,自动断连!" , i ); m_pDlg->AddLog(szMsg); } } else { //不在使用中的SQL,20分钟超时 if ( dwNow - pSocket->m_dwEndUse > SQL_TIMEOUT_DISUSE ) { pSocket->m_AdoRS.Close(); pSocket->m_AdoRS2.Close(); pSocket->m_adoConnection.Close(); pSocket->m_bUse = false; pSocket->m_lRandID = 0; sprintf(szMsg, "SQL:%d 超时未使用,自动断连!" , i ); m_pDlg->AddLog(szMsg); } } } } } */ IOCP_SQL_PTR pSql = m_KeyList.m_SQL; if ( pSql ) { for ( int i=0 ; i<MAX_SQLCONNECT; i++ ) { if ( pSql[i].adoConnection.IsOpen() ) { if ( pSql[i].bUse && pSql[i].lRandID>0 ) //在使用中的SQL,40分钟超时 { if ( dwNow - pSql[i].dwEndUse > SQL_TIMEOUT_USE ) { pSql[i].adoConnection.Close(); pSql[i].bUse = false; pSql[i].lRandID = 0; _stprintf(szMsg, _T("SQL:%d 操作超时,自动断连!") , i ); m_pDlg->AddLog(szMsg); } } else { //不在使用中的SQL,20分钟超时 if ( dwNow - pSql[i].dwEndUse > SQL_TIMEOUT_DISUSE ) { pSql[i].adoConnection.Close(); pSql[i].bUse = false; pSql[i].lRandID = 0; _stprintf(szMsg, _T("SQL:%d 超时未使用,自动断连!") , i ); m_pDlg->AddLog(szMsg); } } } } } } catch (...) { LOG_APPERROR(_T("T")); } } DWORD WINAPI CIOCP::MainLoop2(LPVOID lp_param) { try { AutoCoInitializeEx AutoCoInit; //自动初始化与结束Com环境 CIOCP * piocp = (CIOCP *)lp_param; int nCount = 0; int nCount2= 0; while( !piocp->m_bQuit ) { nCount ++; if ( nCount>10 ) //每5秒检测一次超时 { nCount=0; piocp->CheckForInvalidConnection(); } nCount2 ++; if ( nCount2>120 ) //每1分钟检测一次超时 { nCount2=0; piocp->CheckForInvalidSQLConnection(); } Sleep(500); } return TRUE; } catch(...) { LOG_APPERROR(_T("T")); return false; } }