#include "stdafx.h" #include "IOCP.h" #pragma comment(lib, "ws2_32.lib") #include "..\SmsCenterDlg.h" ///////////////////////////////////////////////////////////////////////////////////////////// CIOCP::CIOCP() { m_pDlg = NULL; m_listen_socket = INVALID_SOCKET; m_listen_socket2 = INVALID_SOCKET; m_bInitSocket = false; m_bInitSocket2 = false; m_bInit = false; m_h_MainLoop = INVALID_HANDLE_VALUE; m_h_MainLoop2 = INVALID_HANDLE_VALUE; m_bQuit = false; } CIOCP::~CIOCP() { Close(); } /*------------------------------------------------------------------------------------------- 函数功能:关闭并清除资源 函数说明: 函数返回: -------------------------------------------------------------------------------------------*/ void CIOCP::Close() { try { int i; if ( !m_bInit ) return ; m_bQuit = true; if (m_h_MainLoop!=INVALID_HANDLE_VALUE ) { if (WaitForSingleObject(m_h_MainLoop,3000)== WAIT_TIMEOUT ) TerminateThread(m_h_MainLoop,0); //如果退出超时,强制结束线程 CloseHandle(m_h_MainLoop); m_h_MainLoop = INVALID_HANDLE_VALUE; } if (m_h_MainLoop2!=INVALID_HANDLE_VALUE ) { if ( WaitForSingleObject(m_h_MainLoop2,3000)== WAIT_TIMEOUT ) TerminateThread(m_h_MainLoop2,0); //如果退出超时,强制结束线程 CloseHandle(m_h_MainLoop2); m_h_MainLoop2 = INVALID_HANDLE_VALUE; } PostQueuedCompletionStatus( m_h_iocp, 0, 0, NULL ); CloseHandle( m_h_iocp ); for( i = 0; i < m_n_thread_count; i++ ) { if (m_h_thread[i]!=INVALID_HANDLE_VALUE ) { if ( WaitForSingleObject(m_h_thread[i],1000)== WAIT_TIMEOUT ) TerminateThread(m_h_thread[i],0); //如果退出超时,强制结束线程 CloseHandle( m_h_thread[i] ); m_h_thread[i] = INVALID_HANDLE_VALUE; } } Sleep(200); //等待其它的退出 m_KeyList.Close(); CloseMainSocket(); m_bInit = false; } catch (...) { LOG_APPERROR(_T("T")); } } /*------------------------------------------------------------------------------------------- 函数功能:初始化IO结点 函数说明: 函数返回: -------------------------------------------------------------------------------------------*/ void CIOCP::InitIoContext(IOCP_IO_PTR lp_io) { if ( !lp_io ) return ; try { if ( lp_io< &m_KeyList.m_IO[0] || lp_io>&m_KeyList.m_IO[MAX_LOGINUSER] ) return ; memset( &lp_io->ol, 0, sizeof( WSAOVERLAPPED ) ); //memset( &lp_io->buf, 0, BUFFER_SIZE ); lp_io->wsaBuf.buf = lp_io->buf; lp_io->wsaBuf.len = BUFFER_SIZE; lp_io->lRecvLen = 0; } catch (...) { LOG_APPERROR(_T("T")); } } /*------------------------------------------------------------------------------------------- 函数功能:初始化侦听SOCKET端口,并和完成端口连接起来。 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::InitSocket() { //端口1初始化 m_listen_socket = WSASocket( AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED ); if( INVALID_SOCKET == m_listen_socket ) { return FALSE; } //IOCP_KEY_PTR lp_key = m_key_group.GetBlank(); IOCP_KEY_PTR lp_key = m_KeyList.GetBlank_Key(); if ( !lp_key ) return FALSE; lp_key->socket = m_listen_socket; HANDLE hRet = CreateIoCompletionPort( (HANDLE)m_listen_socket, m_h_iocp, (ULONG_PTR)lp_key, 0 ); if( hRet == NULL ) { CloseOneSocket( m_listen_socket ); //m_key_group.RemoveAt( lp_key ); m_KeyList.RemoveAt_Key( lp_key ); return FALSE; } //端口2初始化 m_listen_socket2 = WSASocket( AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED ); if( INVALID_SOCKET == m_listen_socket2 ) { return FALSE; } //IOCP_KEY_PTR lp_key2 = m_key_group.GetBlank(); IOCP_KEY_PTR lp_key2 = m_KeyList.GetBlank_Key(); lp_key2->socket = m_listen_socket2; hRet = CreateIoCompletionPort( (HANDLE)m_listen_socket2, m_h_iocp, (ULONG_PTR)lp_key2, 0 ); if( hRet == NULL ) { CloseOneSocket( m_listen_socket ); //m_key_group.RemoveAt( lp_key ); m_KeyList.RemoveAt_Key( lp_key ); CloseOneSocket( m_listen_socket2 ); //m_key_group.RemoveAt( lp_key2 ); m_KeyList.RemoveAt_Key( lp_key2 ); return FALSE; } return TRUE; } /*------------------------------------------------------------------------------------------- 函数功能:关闭所有线程 函数说明: 函数返回: -------------------------------------------------------------------------------------------*/ void CIOCP::CloseThreadHandle(int count ) { if( count <= 0 ) { return; } for( int i= 0; i < count; i++ ) { if (m_h_thread[i]!=INVALID_HANDLE_VALUE ) { if ( WaitForSingleObject(m_h_thread[i],1000)== WAIT_TIMEOUT ) TerminateThread(m_h_thread[i],0); //如果退出超时,强制结束线程 CloseHandle( m_h_thread[i] ); m_h_thread[i] = INVALID_HANDLE_VALUE; } } } /*------------------------------------------------------------------------------------------- 函数功能:将侦听端口和自己的IP,PORT绑定,并开始侦听 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::BindAndListenSocket() { SOCKADDR_IN addr; memset( &addr, 0, sizeof(SOCKADDR_IN) ); addr.sin_family = AF_INET; //addr.sin_addr.s_addr = inet_addr( ADDR ); //addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_addr.s_addr = m_pDlg->m_Setup.addr; #ifdef UNICODE addr.sin_port = htons( DEFAULT_SERVERPORT_UNI ); #else addr.sin_port = htons( DEFAULT_LISTENPORT ); #endif int nRet; nRet = bind( m_listen_socket, (SOCKADDR*)&addr, sizeof( SOCKADDR ) ); if( SOCKET_ERROR == nRet) { //cout<<"bind fail!"<MAXTHREAD_COUNT ) m_n_thread_count = MAXTHREAD_COUNT; //m_n_thread_count = 4; for( i = 0; i < m_n_thread_count; i++ ) { m_h_thread[i] = CreateThread( NULL, 0, CompletionRoutine, (LPVOID)this, 0, NULL ); if( INVALID_HANDLE_VALUE == m_h_thread[i] ) { CloseThreadHandle( i ); CloseHandle( m_h_iocp ); return FALSE; } } return TRUE; } /*------------------------------------------------------------------------------------------- 函数功能:发出一定数量的连接 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::PostAcceptEx() { #ifdef _DEBUG int count = 10; #else int count = 30; #endif DWORD dwBytes; BOOL bRet; for( int i = 0; i < count; i++ ) { SOCKET socket = WSASocket( AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED ); if( INVALID_SOCKET == socket ) { //cout<<"post accept ex fail!"<socket = socket; lp_io->operation = IOCP_ACCEPT; lp_io->state = SOCKET_STATE_NOT_CONNECT; ///////////////////////////////////////////////// bRet = lpAcceptEx( m_listen_socket, lp_io->socket, lp_io->buf, // lp_io->wsaBuf.len - 2 * ( sizeof(SOCKADDR_IN) + 16 ), 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &dwBytes,&lp_io->ol); if( ( bRet == FALSE ) && ( WSA_IO_PENDING != WSAGetLastError() ) ) { closesocket( socket ); //m_io_group.RemoveAt( lp_io ); m_KeyList.RemoveAt_IO( lp_io ); //cout<<"post acceptex fail:"<operation = IOCP_READ; } break; case IOCP_COMPLETE_ACCEPT_READ: { /* lp_io->operation = IOCP_WRITE; GetAddrAndPort( lp_io->wsaBuf.buf, szAddress, uPort ); //MSG(lp_io->wsaBuf.len); memset( &lp_io->ol, 0, sizeof(lp_io->ol) ); */ } break; case IOCP_COMPLETE_READ: { //cout<<"read a data!*******************************"<lRecvLen += lp_io->wsaBuf.len; long lRet=0; if ( lp_io->m_lLocalPort == DEFAULT_LISTENPORT2 ) lRet = GetFrame_Cmpp(lp_io); else lRet = GetFrame(lp_io); if ( lRet < 0 ) { //处理帧失败 m_KeyList.RemoveAt_IO( lp_io ); m_KeyList.RemoveAt_Key( lp_key ); closesocket( lp_io->socket ); lp_io->operation = IOCP_END; //关闭后不用处理了 } else { //接收到的数据过大,并且没有合适的包。 if ( lp_io->lRecvLen>=BUFFER_SIZE ) { m_KeyList.RemoveAt_IO( lp_io ); m_KeyList.RemoveAt_Key( lp_key ); closesocket( lp_io->socket ); lp_io->operation = IOCP_END; //关闭后不用处理了 } else { //重新开始投射io lp_io->operation = IOCP_READ; memset( &lp_io->ol, 0, sizeof(lp_io->ol) ); lp_io->wsaBuf.buf = lp_io->buf + lp_io->lRecvLen; lp_io->wsaBuf.len = BUFFER_SIZE-lp_io->lRecvLen; } } } break; case IOCP_COMPLETE_WRITE: { try { IOCP_IO_SEND_PTR ioS = (IOCP_IO_SEND_PTR)lp_io; if ( ioS->wsaBuf.len >0 ) { if ( ioS->wsaBuf.buf ) delete ioS->wsaBuf.buf; ioS->wsaBuf.buf = NULL; } delete ioS; } catch (...) { LOG_APPERROR(_T("T")); break; } } break; default: { //cout<<"handleData do nothing!*********************"<operation ) { case IOCP_WRITE: { nRet = WSASend( lp_io->socket, &lp_io->wsaBuf, 1, &dwBytes, 0, &lp_io->ol,NULL); if( ( nRet == SOCKET_ERROR ) && ( WSAGetLastError() != WSA_IO_PENDING ) ) { m_KeyList.RemoveAt_IO( lp_io ); m_KeyList.RemoveAt_Key( lp_key ); closesocket( lp_io->socket ); return FALSE; } } break; case IOCP_WRITE2: { /* IOCP_IO_SEND_PTR ioS = (IOCP_IO_SEND_PTR)lp_io; ioS->operation = IOCP_WRITE; nRet = WSASend( lp_key->socket, &ioS->wsaBuf[ioS->lSendNowIndex-1], 1, &dwBytes, 0, &ioS->ol,NULL); /* if( ( nRet == SOCKET_ERROR ) && ( WSAGetLastError() != WSA_IO_PENDING ) ) { //cout<<"WSASend fail!----------------------------------------"<socket ); //m_io_group.RemoveAt( lp_io ); m_KeyList.RemoveAt_IO( lp_io ); //m_key_group.RemoveAt( lp_key ); m_KeyList.RemoveAt_Key( lp_key ); return FALSE; } */ } break; case IOCP_READ: { //cout<<"post a read data!-----------------------------------------"<socket, &lp_io->wsaBuf, 1, &dwBytes, &dwFlags, &lp_io->ol,NULL); if( ( nRet == SOCKET_ERROR ) && ( WSAGetLastError() != WSA_IO_PENDING ) ) { m_KeyList.RemoveAt_IO( lp_io ); m_KeyList.RemoveAt_Key( lp_key ); closesocket( lp_io->socket ); return FALSE; } } break; case IOCP_END: break; default: { //cout<<"DataAction do nothing!------------------------------------------"<0 ) { if (lp_io[i].dwAcceptTime>0 && lp_io[i].state != SOCKET_STATE_CONNECT_AND_READ ) { /* op_len = sizeof(op); nRet = getsockopt( lp_io[i].socket, SOL_SOCKET, SO_CONNECT_TIME, (char*)&op, &op_len ); if( SOCKET_ERROR == nRet ) { continue; } if( op != 0xffffffff && op > 3 ) //3秒连接超时 { sprintf(szMsg , "[%d]%s:%d 连接超时!" , lp_io[i].m_lLocalPort,lp_io[i].m_szIP,lp_io[i].m_lPort); m_pDlg->AddLog(szMsg); closesocket( lp_io[i].socket ); } */ if ( dwNow - lp_io[i].dwAcceptTime >5000 ) //10秒连接超时 { _stprintf(szMsg , _T("[%d]%s:%d 连接超时!") , lp_io[i].m_lLocalPort,lp_io[i].m_szIP,lp_io[i].m_lPort); m_pDlg->AddLog(szMsg); closesocket( lp_io[i].socket ); } } if ( lp_io[i].state == SOCKET_STATE_CONNECT_AND_READ ) { if ( lp_io[i].dwEndRecvFrame>0 ) { if ( dwNow - lp_io[i].dwEndRecvFrame > RECVREFRESH_TIMEOUT ) { lp_io[i].m_lID = 199992; //断线退出的标志 closesocket( lp_io[i].socket ); } } else { if ( dwNow - lp_io[i].dwAcceptTime > 40000 ) //30秒没收到正确的包就超时 { _stprintf(szMsg , _T("[%d]%s:%d 接收数据超时!") , lp_io[i].m_lLocalPort,lp_io[i].m_szIP,lp_io[i].m_lPort); m_pDlg->AddLog(szMsg); closesocket( lp_io[i].socket ); } } } } } } catch (...) { LOG_APPERROR(_T("T")); } } /*------------------------------------------------------------------------------------------- 函数功能:注册FD_ACCEPTG事件到m_h_accept_event事件,以便所有发出去的连接耗耗尽时,得到通知。 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::RegAcceptEvent() { int nRet; m_h_accept_event = CreateEvent( NULL, FALSE, FALSE, NULL ); if( NULL == m_h_accept_event ) { return FALSE; } nRet = WSAEventSelect( m_listen_socket, m_h_accept_event, FD_ACCEPT ); if( nRet != 0 ) { CloseHandle( m_h_accept_event ); return FALSE; } return TRUE; } /*------------------------------------------------------------------------------------------- 函数功能:得到连接上来的客户端IP和PORT 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::GetAddrAndPort(char*buf,char ip[],UINT &port) { int len = BUFFER_SIZE - sizeof( SOCKADDR_IN ) - 16; char *lp_buf = buf + len; //直接读取远端地址 SOCKADDR_IN addr; memcpy( &addr, lp_buf, sizeof( SOCKADDR_IN ) ); port = ntohl( addr.sin_port ); strcpy( ip, inet_ntoa( addr.sin_addr ) ); return TRUE; } /////////////////////////////////////////////////////////////////////////////////////////////////////////// /*------------------------------------------------------------------------------------------- 函数功能:初始化完成端口及相关的所有东西,并发出每一个10个连接. 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ BOOL CIOCP::Init() { BOOL b1; BOOL b2; WSAData data; if( WSAStartup( MAKEWORD(2,2),&data) != 0 ) { //cout<<"WSAStartup fail!"<Close(); return false; } //if( m_bInitSocket2 ) //{ m_h_MainLoop2 = CreateThread( NULL, 0, MainLoop2, (LPVOID)this, 0, NULL ); if ( m_h_MainLoop2 == INVALID_HANDLE_VALUE ) //主线程创建不成功 { this->Close(); return false; } //} return TRUE; } /*------------------------------------------------------------------------------------------- 函数功能:主循环 函数说明: 函数返回:成功,TRUE;失败,FALSE -------------------------------------------------------------------------------------------*/ DWORD CIOCP::MainLoop(LPVOID lp_param) { try { AutoCoInitializeEx AutoCoInit; //自动初始化与结束Com环境 CIOCP * piocp = (CIOCP *)lp_param; DWORD dwRet; while( !piocp->m_bQuit ) { dwRet = WaitForSingleObject( piocp->m_h_accept_event, 300 ); switch( dwRet ) { case WAIT_FAILED: { CString str; str.Format( _T("Accept_Event Error:%d") , GetLastError()); piocp->m_pDlg->AddLog(str); piocp->m_pDlg->ON_TD_StopAll2(); //可能是资源不够造成失败,停止所有通道 piocp->PostAcceptEx(); //AfxMessageBox( _T("accept_event Error!") ); //PostQueuedCompletionStatus( m_h_iocp, 0, 0, NULL ); //return FALSE; } break; case WAIT_TIMEOUT: { //CheckForInvalidConnection(); } break; case WAIT_OBJECT_0: //接收到了所有发出的连接都用光了的消息,再次发出连接 { piocp->PostAcceptEx(); } break; } /* #ifndef UNICODE dwRet = WaitForSingleObject( piocp->m_h_accept_event2, 300 ); switch( dwRet ) { case WAIT_FAILED: { AfxMessageBox( _T("accept_event2 Error!") ); //PostQueuedCompletionStatus( m_h_iocp, 0, 0, NULL ); //return FALSE; } break; case WAIT_TIMEOUT: { //CheckForInvalidConnection(); } break; case WAIT_OBJECT_0: //接收到了所有发出的连接都用光了的消息,再次发出连接 { piocp->PostAcceptEx2(); } break; } #endif */ /* nCount ++; if ( nCount>5 ) //每5秒检测一次超时 { nCount=0; //每个event等待1秒,加起来就是2秒,可以检测连接没有数据的socket piocp->CheckForInvalidConnection(); } */ } return TRUE; } catch(...) { LOG_APPERROR(_T("T")); return false; } } /*------------------------------------------------------------------------------------------- 函数功能:数据处理线程函数 函数说明: 函数返回: -------------------------------------------------------------------------------------------*/ DWORD CIOCP::CompletionRoutine(LPVOID lp_param) { USES_CONVERSION; try { AutoCoInitializeEx AutoCoInit; //自动初始化与结束Com环境 CIOCP* lp_this = (CIOCP*)lp_param; int nRet; BOOL bRet; DWORD dwBytes = 0; HANDLE hRet; IOCP_KEY_PTR lp_key = NULL; IOCP_IO_PTR lp_io = NULL; LPWSAOVERLAPPED lp_ov = NULL; IOCP_KEY_PTR lp_new_key = NULL; while( TRUE ) { bRet = GetQueuedCompletionStatus( lp_this->m_h_iocp, &dwBytes, (PULONG_PTR)&lp_key, &lp_ov, INFINITE ); if ( lp_this->m_bQuit ) //收到退出信号 return 0; lp_io = (IOCP_IO_PTR)lp_ov; if ( lp_io && !AfxIsValidAddress(&lp_io->operation,4,true) ) //收到的lp_io有问题 continue; if ( (bRet==0 || dwBytes ==0 ) && lp_io && lp_io->operation== IOCP_WRITE ) //删除不成功的Write { lp_this->HandleData( lp_io, lp_key, IOCP_COMPLETE_WRITE ); continue; } if( bRet == 0 ) { if ( lp_key && lp_io && lp_io->lRandID>0 ) { lp_this->m_KeyList.RemoveAt_IO( lp_io ); lp_this->m_KeyList.RemoveAt_Key( lp_key ); continue; } if ( (!lp_key && !lp_io) ) { continue; //return 0; //已退出 } //cout << "GetQueuedCompletionStatus() failed: " << GetLastError() << endl; //continue; } if( NULL == lp_key ) { continue; //return 0; } if( lp_io == NULL ) { //cout<<"recv a null CIoContext!"<operation!=IOCP_WRITE && (lp_io< &lp_this->m_KeyList.m_IO[0] || lp_io>&lp_this->m_KeyList.m_IO[MAX_LOGINUSER]) ) continue; if ( lp_key && (lp_key< &lp_this->m_KeyList.m_Key[0] || lp_key>&lp_this->m_KeyList.m_Key[MAX_LOGINUSER]) ) continue; if( ( IOCP_ACCEPT != lp_io->operation ) && ( 0 == dwBytes ) ) { closesocket( lp_io->socket ); //lp_this->m_io_group.RemoveAt( lp_io ); //lp_this->m_key_group.RemoveAt( lp_key ); lp_this->m_KeyList.RemoveAt_IO( lp_io ); lp_this->m_KeyList.RemoveAt_Key( lp_key ); //MSG("一个用户退出了"); continue; } switch( lp_io->operation ) { case IOCP_ACCEPT: { //cout<<"接收到一个连接"<state = SOCKET_STATE_CONNECT; if( dwBytes > 0 ) { lp_io->state = SOCKET_STATE_CONNECT_AND_READ; //cout<<"读取到一条数据"<lpGetAcceptExSockAddrs(lp_io->buf,0,sizeof(SOCKADDR_IN) + 16,sizeof(SOCKADDR_IN) + 16,&local,&local_len,&remote,&remote_len); SOCKADDR_IN * pAA = ((SOCKADDR_IN *)local); SOCKADDR_IN * pBB = ((SOCKADDR_IN *)remote); #ifdef UNICODE _tcscpy(lp_io->m_szIP,A2W(inet_ntoa(pBB->sin_addr))); #else strcpy(lp_io->m_szIP,inet_ntoa(pBB->sin_addr)); #endif lp_io->m_lPort = ntohs(pBB->sin_port); lp_io->m_lLocalPort = ntohs(pAA->sin_port); #ifdef UNICODE if (lp_io->m_lLocalPort == DEFAULT_SERVERPORT_UNI ) #else if (lp_io->m_lLocalPort == DEFAULT_LISTENPORT ) #endif { nRet = setsockopt( lp_io->socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, ( char* )&lp_this->m_listen_socket, sizeof( lp_this->m_listen_socket ) ); } else { nRet = setsockopt( lp_io->socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, ( char* )&lp_this->m_listen_socket2, sizeof( lp_this->m_listen_socket2 ) ); } if( SOCKET_ERROR == nRet ) { closesocket( lp_io->socket ); //lp_this->m_io_group.RemoveAt( lp_io ); lp_this->m_KeyList.RemoveAt_IO( lp_io ); //cout<<"update socket fail!"<m_key_group.GetBlank(); lp_new_key = lp_this->m_KeyList.GetBlank_Key(); if( lp_new_key == NULL ) { closesocket( lp_io->socket ); continue; } lp_new_key->socket = lp_io->socket; lp_io->pKey = lp_new_key; //记录对应的key参数 lp_io->dwAcceptTime = ::GetTickCount(); //Accept的时间 //将新建立的SOCKET同完成端口关联起来。 hRet = CreateIoCompletionPort( ( HANDLE )lp_io->socket, lp_this->m_h_iocp, (ULONG_PTR)lp_new_key,0 ); if( NULL == hRet ) { lp_this->m_KeyList.RemoveAt_Key( lp_new_key ); lp_this->m_KeyList.RemoveAt_IO( lp_io ); closesocket( lp_io->socket ); continue; } //处理数据 lp_this->HandleData( lp_io, lp_new_key,IOCP_COMPLETE_ACCEPT ); bRet = lp_this->DataAction( lp_io, lp_new_key ); if( FALSE == bRet ) { continue; } } break; case IOCP_READ: { if( SOCKET_STATE_CONNECT_AND_READ != lp_io->state ) { lp_io->state = SOCKET_STATE_CONNECT_AND_READ; } lp_io->wsaBuf.len = dwBytes; lp_this->HandleData( lp_io, lp_key, IOCP_COMPLETE_READ ); bRet = lp_this->DataAction( lp_io, lp_key ); if( FALSE == bRet ) { continue; } } break; case IOCP_WRITE: { lp_this->HandleData( lp_io, lp_key, IOCP_COMPLETE_WRITE ); if( FALSE == bRet ) { continue; } } break; default: { continue; } break; } } return 0; } catch (... ) { LOG_APPERROR(_T("T")); return 1; } } void CIOCP::CloseMainSocket() { CloseOneSocket(m_listen_socket); CloseOneSocket(m_listen_socket2); } BOOL CIOCP::BindAndListenSocket2() { SOCKADDR_IN addr; memset( &addr, 0, sizeof(SOCKADDR_IN) ); addr.sin_family = AF_INET; //addr.sin_addr.s_addr = inet_addr( ADDR ); //addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_addr.s_addr = m_pDlg->m_Setup.addr; addr.sin_port = htons( DEFAULT_LISTENPORT2 ); int nRet; nRet = bind( m_listen_socket2, (SOCKADDR*)&addr, sizeof( SOCKADDR ) ); if( SOCKET_ERROR == nRet ) { //cout<<"bind fail!"<socket = socket; lp_io->operation = IOCP_ACCEPT; lp_io->state = SOCKET_STATE_NOT_CONNECT; ///////////////////////////////////////////////// bRet = lpAcceptEx( m_listen_socket2, lp_io->socket, lp_io->buf, // lp_io->wsaBuf.len - 2 * ( sizeof(SOCKADDR_IN) + 16 ), 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &dwBytes,&lp_io->ol); if( ( bRet == FALSE ) && ( WSA_IO_PENDING != WSAGetLastError() ) ) { closesocket( socket ); //m_io_group.RemoveAt( lp_io ); m_KeyList.RemoveAt_IO( lp_io ); //cout<<"post acceptex fail:"<lRecvLen ; i++ ) { if ( i+sizeof(Socket_Head) > (DWORD)lp_io->lRecvLen ) { //i--; break; } Socket_Head SHead={0}; memcpy(&SHead,lp_io->buf+i,sizeof(Socket_Head)); i=i+sizeof(Socket_Head); long lGetLen=SHead.lFrameLen; if ( SHead.bCompress ) lGetLen = SHead.lCompressLen; if ( SHead.lFuncType <=0 || SHead.lFuncType > 100000 || SHead.lCompressLen<0 || SHead.lCompressLen>BUFFER_SIZE || SHead.lFrameLen<0 || SHead.lFrameLen>BUFFER_SIZE ) //简单判断收到的帧是否有问题 { return -1; //数据不对,关闭连接 } if ( i+lGetLen>lp_io->lRecvLen) //数据不够 { i=i-sizeof(Socket_Head); //i--; break; } lp_io->dwEndRecvFrame = ::GetTickCount(); if ( SHead.lFuncType == SMSFUNC_TEST && lp_io->m_lUserID>0 ) //直接回复SMSFunc_Test { ANS_Test * pTestRet = new ANS_Test; pTestRet->lTemp = ::GetTickCount(); CSmsCenterDlg::SendFrame(lp_io, SMSFUNC_TEST , (BYTE*)pTestRet , sizeof(ANS_Test) ); //发送返回 } else { IOCP_SQL_PTR pSql = m_KeyList.GetBlank_SQL(); if ( pSql ) { try { //CProcessSocket * pProcess = m_KeyList.GetBlank_Process(); CProcessSocket * pProcess = new CProcessSocket; if ( pProcess ) { BOOL bOK = true; pProcess->SetDlg(m_pDlg); pProcess->IOCP_setSQL(pSql); Cmpp_Head head={0}; bOK = pProcess->IOCP_GetFrameData(lp_io , SHead , head,(BYTE*)(lp_io->buf+i) , lGetLen); if ( bOK ) //启动线程处理 { iGetFrame ++; QueueUserWorkItem(CProcessSocket::IOCP_Process, (PVOID)pProcess, WT_EXECUTELONGFUNCTION); } else { //释放资源 pProcess->m_bUse = false; pProcess->m_lRandID=0; pSql->bUse = false; pSql->lRandID = 0; //暂时不删除此对像,防止出错 delete pProcess; //删除new的对像 } } else { AfxMessageBox(_T("new Process error")); } } catch(...) { LOG_APPERROR(_T("T")); pSql->bUse = false; //有异常,恢复正常模式 pSql->lRandID = 0; } } } i=i+lGetLen; i=i-1; } if ( i >= lp_io->lRecvLen ) //数据刚刚好 { lp_io->lRecvLen = 0; } else { if ( i> 0 ) { //转移数据 lp_io->lRecvLen = lp_io->lRecvLen-i; memcpy(lp_io->buf , lp_io->buf+i , lp_io->lRecvLen ); } } return iGetFrame; } catch (...) { LOG_APPERROR(_T("T")); } return -1; } long CIOCP::GetFrame_Cmpp(IOCP_IO_PTR lp_io) { try { int iGetFrame=0; for ( int i=0 ; ilRecvLen ; i++ ) { if ( i+sizeof(Cmpp_Head) > (DWORD)lp_io->lRecvLen ) { //i--; break; } Cmpp_Head SHead={0}; memcpy(&SHead,lp_io->buf+i,sizeof(Cmpp_Head)); i=i+sizeof(Cmpp_Head); SHead.Command_Id = ntohl(SHead.Command_Id); SHead.Sequence_Id= ntohl(SHead.Sequence_Id); SHead.Total_Length = ntohl(SHead.Total_Length); long lGetLen=SHead.Total_Length-sizeof(Cmpp_Head); if ( SHead.Total_Length <=0 || SHead.Total_Length>BUFFER_SIZE ) //简单判断收到的帧是否有问题 { return -1; //数据不对,关闭连接 } if ( i+lGetLen>lp_io->lRecvLen) //数据不够 { i=i-sizeof(Cmpp_Head); //i--; break; } lp_io->dwEndRecvFrame = ::GetTickCount(); /* if ( SHead.Command_Id == CMPP_ACTIVE_TEST && lp_io->m_lUserID>0 ) //直接回复SMSFunc_Test { Cmpp_Active_Test_Resp * pActive = new Cmpp_Active_Test_Resp; memset(pActive , 0 , sizeof(Cmpp_Active_Test_Resp)); CSmsCenterDlg::SendFrame_Cmpp(lp_io, CMPP_ACTIVE_TEST_RESP ,SHead.Sequence_Id, (BYTE*)pActive , sizeof(Cmpp_Active_Test_Resp) ); //发送返回 } else { IOCP_SQL_PTR pSql = m_KeyList.GetBlank_SQL(); if ( pSql ) { //CProcessSocket * pProcess = m_KeyList.GetBlank_Process(); CProcessSocket * pProcess = new CProcessSocket; if ( pProcess ) { BOOL bOK = true; pProcess->SetDlg(m_pDlg); pProcess->IOCP_setSQL(pSql); Socket_Head head={0}; bOK = pProcess->IOCP_GetFrameData(lp_io , head,SHead , (BYTE*)(lp_io->buf+i) , lGetLen); if ( bOK ) //启动线程处理 { iGetFrame ++; QueueUserWorkItem(CProcessSocket::IOCP_Process, (PVOID)pProcess, WT_EXECUTELONGFUNCTION); } else { //释放资源 pProcess->m_bUse = false; pProcess->m_lRandID=0; pSql->bUse = false; pSql->lRandID = 0; //暂时不删除此对像,防止出错 delete pProcess; //删除new的对像 } } } } */ i=i+lGetLen; i=i-1; } if ( i >= lp_io->lRecvLen ) //数据刚刚好 { lp_io->lRecvLen = 0; } else { if ( i> 0 ) { //转移数据 lp_io->lRecvLen = lp_io->lRecvLen-i; memcpy(lp_io->buf , lp_io->buf+i , lp_io->lRecvLen ); } } return iGetFrame; } catch (...) { LOG_APPERROR(_T("T")); } return -1; } void CIOCP::SetDlg(CSmsCenterDlg *pDlg) { m_pDlg = pDlg; } void CIOCP::InitIoContext_Send(IOCP_IO_SEND_PTR lp_io) { memset( &lp_io->ol, 0, sizeof( WSAOVERLAPPED ) ); } void CIOCP::CheckForInvalidSQLConnection() { try { TCHAR szMsg[256]={0}; DWORD dwNow = ::GetTickCount(); /* LPVOID * lpProcess = m_KeyList.m_Process; if ( lpProcess ) { for ( int i=0 ; im_adoConnection.IsOpen() ) { if ( pSocket->m_bUse && pSocket->m_lRandID>0 ) //在使用中的SQL,40分钟超时 { if ( dwNow - pSocket->m_dwEndUse > SQL_TIMEOUT_USE ) { pSocket->m_AdoRS.Close(); pSocket->m_AdoRS2.Close(); pSocket->m_adoConnection.Close(); pSocket->m_bUse = false; pSocket->m_lRandID = 0; sprintf(szMsg, "SQL:%d 操作超时,自动断连!" , i ); m_pDlg->AddLog(szMsg); } } else { //不在使用中的SQL,20分钟超时 if ( dwNow - pSocket->m_dwEndUse > SQL_TIMEOUT_DISUSE ) { pSocket->m_AdoRS.Close(); pSocket->m_AdoRS2.Close(); pSocket->m_adoConnection.Close(); pSocket->m_bUse = false; pSocket->m_lRandID = 0; sprintf(szMsg, "SQL:%d 超时未使用,自动断连!" , i ); m_pDlg->AddLog(szMsg); } } } } } */ IOCP_SQL_PTR pSql = m_KeyList.m_SQL; if ( pSql ) { for ( int i=0 ; i0 ) //在使用中的SQL,40分钟超时 { if ( dwNow - pSql[i].dwEndUse > SQL_TIMEOUT_USE ) { pSql[i].adoConnection.Close(); pSql[i].bUse = false; pSql[i].lRandID = 0; _stprintf(szMsg, _T("SQL:%d 操作超时,自动断连!") , i ); m_pDlg->AddLog(szMsg); } } else { //不在使用中的SQL,20分钟超时 if ( dwNow - pSql[i].dwEndUse > SQL_TIMEOUT_DISUSE ) { pSql[i].adoConnection.Close(); pSql[i].bUse = false; pSql[i].lRandID = 0; _stprintf(szMsg, _T("SQL:%d 超时未使用,自动断连!") , i ); m_pDlg->AddLog(szMsg); } } } } } } catch (...) { LOG_APPERROR(_T("T")); } } DWORD WINAPI CIOCP::MainLoop2(LPVOID lp_param) { try { AutoCoInitializeEx AutoCoInit; //自动初始化与结束Com环境 CIOCP * piocp = (CIOCP *)lp_param; int nCount = 0; int nCount2= 0; while( !piocp->m_bQuit ) { nCount ++; if ( nCount>10 ) //每5秒检测一次超时 { nCount=0; piocp->CheckForInvalidConnection(); } nCount2 ++; if ( nCount2>120 ) //每1分钟检测一次超时 { nCount2=0; piocp->CheckForInvalidSQLConnection(); } Sleep(500); } return TRUE; } catch(...) { LOG_APPERROR(_T("T")); return false; } }