基础架构设计
master 负责管理 worker 进程,worker 进程负责处理网络事件。整个框架被设计为一种依赖事件驱动、异步、非阻塞的模式。
如此设计的优点:
1.可以充分利用多核机器,增强并发处理能力。
2.多 worker 间可以实现负载均衡。
3.Master 监控并统一管理 worker 行为。在 worker 异常后,可以主动拉起 worker 进程,从而提升了系统的可靠性。并且由 Master 进程控制服务运行中的程序升级、配置项修改等操作,从而增强了整体的动态可扩展与热更的能力。
目录树
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 fengyun@ubuntu:~/share/nginx$ tree . ├── app │ ├── makefile │ ├── nginx.cxx │ ├── ngx_c_conf.cxx │ ├── ngx_log.cxx │ ├── ngx_printf.cxx │ ├── ngx_setproctitle.cxx │ └── ngx_string.cxx ├── common.mk ├── config.mk ├── error.log ├── _include │ ├── ngx_c_conf.h │ ├── ngx_c_crc32.h │ ├── ngx_c_lockmutex.h │ ├── ngx_c_memory.h │ ├── ngx_comm.h │ ├── ngx_c_slogic.h │ ├── ngx_c_socket.h │ ├── ngx_c_threadpool.h │ ├── ngx_func.h │ ├── ngx_global.h │ ├── ngx_logiccomm.h │ └── ngx_macro.h ├── logic │ ├── makefile │ └── ngx_c_slogic.cxx ├── logs ├── makefile ├── misc │ ├── makefile │ ├── ngx_c_crc32.cxx │ ├── ngx_c_memory.cxx │ └── ngx_c_threadpool.cxx ├── net │ ├── makefile │ ├── ngx_c_socket_accept.cxx │ ├── ngx_c_socket_conn.cxx │ ├── ngx_c_socket.cxx │ ├── ngx_c_socket_inet.cxx │ ├── ngx_c_socket_request.cxx │ └── ngx_c_socket_time.cxx ├── nginx.conf ├── proc │ ├── makefile │ ├── ngx_daemon.cxx │ ├── ngx_event.cxx │ └── ngx_process_cycle.cxx └── signal ├── makefile └── ngx_signal.cxx 8 directories, 43 files
main函数程序入口函数
一些全局变量的初始化包括读取配置文件(详见本文6.1读取配置文件)
单例类的初始化,包括CRC32校验类,专门分配内存和释放内存的类
日志初始化(详见日志功能 )
信号初始化(详见信号功能详解 )
环境变量搬家(详见本文6.2.1ngx_init_setproctitle)
创建守护进程ngx_daemon(详见守护进程 ),最终以守护进程的形式运行着
进入正式的工作流程死循环ngx_master_process_cycle();
释放资源,虽然一般走不到这里,但是万一以后有需求呢?比较优雅的释放的方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 int main (int argc, char *const *argv) { int exitcode = 0 ; int i; g_stopEvent = 0 ; ngx_pid = getpid (); ngx_parent = getppid (); g_argvneedmem = 0 ; for (i = 0 ; i < argc; i++) { g_argvneedmem += strlen (argv[i]) + 1 ; } for (i = 0 ; environ[i]; i++) { g_envneedmem += strlen (environ[i]) + 1 ; } g_os_argc = argc; g_os_argv = (char **) argv; ngx_log.fd = -1 ; ngx_process = NGX_PROCESS_MASTER; ngx_reap = 0 ; CConfig *p_config = CConfig::GetInstance (); if (p_config->Load ("nginx.conf" ) == false ) { ngx_log_init (); ngx_log_stderr (0 ,"配置文件[%s]载入失败,退出!" ,"nginx.conf" ); exitcode = 2 ; goto lblexit; } CMemory::GetInstance (); CCRC32::GetInstance (); ngx_log_init (); if (ngx_init_signals () != 0 ) { exitcode = 1 ; goto lblexit; } if (g_socket.Initialize () == false ) { exitcode = 1 ; goto lblexit; } ngx_init_setproctitle (); if (p_config->GetIntDefault ("Daemon" ,0 ) == 1 ) { int cdaemonresult = ngx_daemon (); if (cdaemonresult == -1 ) { exitcode = 1 ; goto lblexit; } if (cdaemonresult == 1 ) { freeresource (); exitcode = 0 ; return exitcode; } g_daemonized = 1 ; } ngx_master_process_cycle (); lblexit: ngx_log_stderr (0 ,"程序退出,再见了!" ); freeresource (); return exitcode; }
master进程工作 ngx_master_process_cycle(master进程)
信号集初始化并且调用sigaddset防止10个信号的干扰
当master把该做的事情(设置主进程标题ngx_setproctitle和创建子进程ngx_start_worker_processes)做完了就会进入一个死循环for
for里面调用sigsuspend(&set);
进程是挂起的,不占用cpu时间 ,只有收到信号才会被唤醒 ,主进程只依靠信号来驱动
sigsuspend是一个原子操作 ,包含4个步骤:
根据给定的参数设置新的mask 并 阻塞当前进程 【因为是个空集,所以不阻塞任何信号】
此时,一旦收到信号,便恢复原先的信号屏蔽 【我们原来调用sigprocmask()的mask在上边设置的,阻塞了多达10个信号,从而保证我下边的执行流程不会再次被其他信号截断】
调用该信号对应的信号处理函数
信号处理函数返回后,sigsuspend返回,使程序流程继续往下走
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 void ngx_master_process_cycle () { sigset_t set; sigemptyset (&set); sigaddset (&set, SIGCHLD); sigaddset (&set, SIGALRM); sigaddset (&set, SIGIO); sigaddset (&set, SIGINT); sigaddset (&set, SIGHUP); sigaddset (&set, SIGUSR1); sigaddset (&set, SIGUSR2); sigaddset (&set, SIGWINCH); sigaddset (&set, SIGTERM); sigaddset (&set, SIGQUIT); if (sigprocmask (SIG_BLOCK, &set, NULL ) == -1 ) { ngx_log_error_core (NGX_LOG_ALERT,errno,"ngx_master_process_cycle()中sigprocmask()失败!" ); } size_t size; int i; size = sizeof (master_process); size += g_argvneedmem; if (size < 1000 ) { char title[1000 ] = {0 }; strcpy (title,(const char *)master_process); strcat (title," " ); for (i = 0 ; i < g_os_argc; i++) { strcat (title,g_os_argv[i]); } ngx_setproctitle (title); ngx_log_error_core (NGX_LOG_NOTICE,0 ,"%s %P 【master进程】启动并开始运行......!" ,title,ngx_pid); } CConfig *p_config = CConfig::GetInstance (); int workprocess = p_config->GetIntDefault ("WorkerProcesses" ,1 ); ngx_start_worker_processes (workprocess); sigemptyset (&set); for ( ;; ) { sigsuspend (&set); sleep (1 ); } return ; }
ngx_spawn_process(master进程创建子进程) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 static int ngx_spawn_process (int inum,const char *pprocname) { pid_t pid; pid = fork(); switch (pid) { case -1 : ngx_log_error_core (NGX_LOG_ALERT,errno,"ngx_spawn_process()fork()产生子进程num=%d,procname=\"%s\"失败!" ,inum,pprocname); return -1 ; case 0 : ngx_parent = ngx_pid; ngx_pid = getpid (); ngx_worker_process_cycle (inum,pprocname); break ; default : break ; } return pid; }
ngx_open_listening_sockets监听端口并且初始化(worker)
NGINX官方socket()和bind(),listen()创建一个监听套接字是在master进程,子进程中调用accept(),这也是我先前的作法,但是这样会造成惊群现象 ,官方解决方法是加一把锁,但是这个方法并不是那么妥当,我选择了在worker中创建不同的监听套接字以避免惊群现象,详情原因见本文末的惊群
1.对于创建的每个要监听的端口都要创建1个socket,ipv4,任意地址,所有网卡设定。
2.setsockopt(isock,SOL_SOCKET, SO_REUSEADDR,(const void *) &reuseaddr, sizeof(reuseaddr))
允许重用本地地址,主要是解决TIME_WAIT这个状态导致bind()失败的问题
3.setnonblocking(isock)
设置isock为非阻塞,这样在后续accept的时候就不再会去阻塞住了,
4.对isock进行bind(),listen()
5.将各个监听的isock(目前为2个)放入监听套接字队列CSocekt::vector m_ListenSocketList;
注意:这是在主进程中创建监听端口(主进程执行这个函数),一旦后续fork()出来四个子进程,五个进程都在监听80和443两个端口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 bool CSocekt::ngx_open_listening_sockets () { int isock; struct sockaddr_in serv_addr ; int iport; char strinfo[100 ]; memset (&serv_addr,0 ,sizeof (serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = htonl (INADDR_ANY); CConfig *p_config = CConfig::GetInstance (); for (int i = 0 ; i < m_ListenPortCount; i++) { isock = socket (AF_INET,SOCK_STREAM,0 ); if (isock == -1 ) { ngx_log_stderr (errno,"CSocekt::Initialize()中socket()失败,i=%d." ,i); return false ; } int reuseaddr = 1 ; if (setsockopt (isock,SOL_SOCKET, SO_REUSEADDR,(const void *) &reuseaddr, sizeof (reuseaddr)) == -1 ) { ngx_log_stderr (errno,"CSocekt::Initialize()中setsockopt(SO_REUSEADDR)失败,i=%d." ,i); close (isock); return false ; } int reuseport = 1 ; if (setsockopt (isock, SOL_SOCKET, SO_REUSEPORT,(const void *) &reuseport, sizeof (int ))== -1 ) { ngx_log_stderr (errno,"CSocekt::Initialize()中setsockopt(SO_REUSEPORT)失败" ,i); } if (setnonblocking (isock) == false ) { ngx_log_stderr (errno,"CSocekt::Initialize()中setnonblocking()失败,i=%d." ,i); close (isock); return false ; } strinfo[0 ] = 0 ; sprintf (strinfo,"ListenPort%d" ,i); iport = p_config->GetIntDefault (strinfo,10000 ); serv_addr.sin_port = htons ((in_port_t )iport); if (bind (isock, (struct sockaddr*)&serv_addr, sizeof (serv_addr)) == -1 ) { ngx_log_stderr (errno,"CSocekt::Initialize()中bind()失败,i=%d." ,i); close (isock); return false ; } if (listen (isock,NGX_LISTEN_BACKLOG) == -1 ) { ngx_log_stderr (errno,"CSocekt::Initialize()中listen()失败,i=%d." ,i); close (isock); return false ; } lpngx_listening_t p_listensocketitem = new ngx_listening_t ; memset (p_listensocketitem,0 ,sizeof (ngx_listening_t )); p_listensocketitem->port = iport; p_listensocketitem->fd = isock; ngx_log_error_core (NGX_LOG_INFO,0 ,"监听%d端口成功!" ,iport); m_ListenSocketList.push_back (p_listensocketitem); } if (m_ListenSocketList.size () <= 0 ) return false ; return true ; }
worker进程初始化工作 ngx_worker_process_init 1.放开信号集屏蔽
2.创建线程池中的线程g_threadpool.create详见本文后续内容,这些线程是用于处理消息队列里的消息的,等到消息队列有消息的时候,主线程会唤醒他们。
3.Initialize_subproc初始化发消息互斥量,连接互斥量,连接回收队列相关互斥量、时间处理队列互斥量,初始化发消息的信号量等以及创建1个发送数据的线程,1个回收连接的线程,1个处理到期不发心跳包的线程。(以及在这里创建监听端口)
4.ngx_epoll_init();初始化epoll相关内容,同时 往监听socket上增加监听事件,从而开始让监听端口履行其职责
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 static void ngx_worker_process_init (int inum) { sigset_t set; sigemptyset (&set); if (sigprocmask (SIG_SETMASK, &set, NULL ) == -1 ) { ngx_log_error_core (NGX_LOG_ALERT,errno,"ngx_worker_process_init()中sigprocmask()失败!" ); } CConfig *p_config = CConfig::GetInstance (); int tmpthreadnums = p_config->GetIntDefault ("ProcMsgRecvWorkThreadCount" ,5 ); if (g_threadpool.Create (tmpthreadnums) == false ) { exit (-2 ); } sleep (1 ); if (g_socket.Initialize_subproc () == false ) { exit (-2 ); } g_socket.ngx_epoll_init (); return ; }
Initialize_subproc子进程中才需要执行的初始化函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 bool CSocekt::Initialize_subproc () { if (ngx_open_listening_sockets () == false ) return false ; if (pthread_mutex_init (&m_sendMessageQueueMutex, NULL ) != 0 ) { ngx_log_stderr (0 ,"CSocekt::Initialize_subproc()中pthread_mutex_init(&m_sendMessageQueueMutex)失败." ); return false ; } if (pthread_mutex_init (&m_connectionMutex, NULL ) != 0 ) { ngx_log_stderr (0 ,"CSocekt::Initialize_subproc()中pthread_mutex_init(&m_connectionMutex)失败." ); return false ; } if (pthread_mutex_init (&m_recyconnqueueMutex, NULL ) != 0 ) { ngx_log_stderr (0 ,"CSocekt::Initialize_subproc()中pthread_mutex_init(&m_recyconnqueueMutex)失败." ); return false ; } if (pthread_mutex_init (&m_timequeueMutex, NULL ) != 0 ) { ngx_log_stderr (0 ,"CSocekt::Initialize_subproc()中pthread_mutex_init(&m_timequeueMutex)失败." ); return false ; } if (sem_init (&m_semEventSendQueue,0 ,0 ) == -1 ) { ngx_log_stderr (0 ,"CSocekt::Initialize_subproc()中sem_init(&m_semEventSendQueue,0,0)失败." ); return false ; } int err; ThreadItem *pSendQueue; m_threadVector.push_back (pSendQueue = new ThreadItem (this )); err = pthread_create (&pSendQueue->_Handle, NULL , ServerSendQueueThread,pSendQueue); if (err != 0 ) { ngx_log_stderr (0 ,"CSocekt::Initialize_subproc()中pthread_create(ServerSendQueueThread)失败." ); return false ; } ThreadItem *pRecyconn; m_threadVector.push_back (pRecyconn = new ThreadItem (this )); err = pthread_create (&pRecyconn->_Handle, NULL , ServerRecyConnectionThread,pRecyconn); if (err != 0 ) { ngx_log_stderr (0 ,"CSocekt::Initialize_subproc()中pthread_create(ServerRecyConnectionThread)失败." ); return false ; } if (m_ifkickTimeCount == 1 ) { ThreadItem *pTimemonitor; m_threadVector.push_back (pTimemonitor = new ThreadItem (this )); err = pthread_create (&pTimemonitor->_Handle, NULL , ServerTimerQueueMonitorThread,pTimemonitor); if (err != 0 ) { ngx_log_stderr (0 ,"CSocekt::Initialize_subproc()中pthread_create(ServerTimerQueueMonitorThread)失败." ); return false ; } } return true ; }
ngx_epoll_init 创建监听端口是在父进程(子进程)中(ngx_open_listening_sockets)进行的,那么完整的初始化监听socket(包括创建连接池并且将每个监听socket放入到连接池的连接和放入epoll红黑树开始监听事件)是在子进程
1.在这里创建一个epoll对象,一定要判断返回值,这是一个好习惯
2.创建连接池,详见本文后续内容连接池
3.每个监听socket增加一个 连接池中的连接,同时连接池内的连接(只有是监听socket的连接才可以)也可以通过lpngx_connection_t::p_Conn->listening = (*pos);
找到监听socket对象。
4.设置连接池内的监听socket的写事件处理函数为p_Conn->rhandler = &CSocekt::ngx_event_accept;
,此函数用于处理三次握手事件,对于三次握手的新连接又会重新指定写事件处理函数和读事件处理函数
5.为每一个监听socket 上增加(EPOLL_CTL_ADD)监听的事件,读事件和TCP连接半关闭分别对应EPOLLIN|EPOLLRDHUP
事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 int CSocekt::ngx_epoll_init () { m_epollhandle = epoll_create (m_worker_connections); if (m_epollhandle == -1 ) { ngx_log_stderr (errno,"CSocekt::ngx_epoll_init()中epoll_create()失败." ); exit (2 ); } initconnection (); std::vector<lpngx_listening_t >::iterator pos; for (pos = m_ListenSocketList.begin (); pos != m_ListenSocketList.end (); ++pos) { lpngx_connection_t p_Conn = ngx_get_connection ((*pos)->fd); if (p_Conn == NULL ) { ngx_log_stderr (errno,"CSocekt::ngx_epoll_init()中ngx_get_connection()失败." ); exit (2 ); } p_Conn->listening = (*pos); (*pos)->connection = p_Conn; p_Conn->rhandler = &CSocekt::ngx_event_accept; if (ngx_epoll_oper_event ( (*pos)->fd, EPOLL_CTL_ADD, EPOLLIN|EPOLLRDHUP, 0 , p_Conn ) == -1 ) { exit (2 ); } } return 1 ; }
ngx_close_connection直接关闭连接 1 2 3 4 5 6 7 8 9 10 11 12 13 void CSocekt::ngx_close_connection (lpngx_connection_t pConn) { ngx_free_connection (pConn); if (pConn->fd != -1 ) { close (pConn->fd); pConn->fd = -1 ; } return ; }
ngx_epoll_oper_event对epoll事件的具体操作(增删改)
注意:原来的理解中,绑定ev.data.ptr这个事,只在EPOLL_CTL_ADD的时候做一次即可,但是发现EPOLL_CTL_MOD似乎会破坏掉ev.data.ptr ,因此不管是EPOLL_CTL_ADD,还是EPOLL_CTL_MOD,ev.data.ptr都要去重新绑定!!!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 int CSocekt::ngx_epoll_oper_event ( int fd, uint32_t eventtype, uint32_t flag, int bcaction, lpngx_connection_t pConn ) { struct epoll_event ev ; memset (&ev, 0 , sizeof (ev)); if (eventtype == EPOLL_CTL_ADD) { ev.events = flag; pConn->events = flag; } else if (eventtype == EPOLL_CTL_MOD) { ev.events = pConn->events; if (bcaction == 0 ) { ev.events |= flag; } else if (bcaction == 1 ) { ev.events &= ~flag; } else { ev.events = flag; } pConn->events = ev.events; } else { return 1 ; } ev.data.ptr = (void *)pConn; if (epoll_ctl (m_epollhandle,eventtype,fd,&ev) == -1 ) { ngx_log_stderr (errno,"CSocekt::ngx_epoll_oper_event()中epoll_ctl(%d,%ud,%ud,%d)失败。" ,fd,eventtype,flag,bcaction); return -1 ; } return 1 ; }
worker子进程处理网络事件(读、写)和定时器事件 事件驱动模型ngx_epoll_process_events 子进程初始化完监听端口和设置好子进程标题之后,执行for死循环,死循环内不断调用ngx_epoll_process_events。
“事件驱动”,无非就是通过获取事件,通过获取到的事件并根据这个事件来调用适当的函数从而让整个程序干活,无非就是这点事;
1.事件驱动我们第一步一定是获取事件,如何获取事件,调用epoll_wait()。
2.一定要严密的判断,epoll_wait返回事件的数目,而事件会返回到参数m_events里,先判断events数目执行对应的判断。
3.然后for循环里不断地通过m_events[i].data.ptr把发生了事件的连接池中的连接 取出来并且revents = m_events[i].events;
取出这个连接的事件类型
4.对于每一个发生事件的连接 判断发生事件的类型, 对于读事件,(this->* (p_Conn->rhandler) )(p_Conn);
函数指针 对于监听套接字的连接会调用CSocekt::ngx_event_accept(c)
,这在子进程创建时进行初始化ngx_epoll_init函数中就已经将连接池内的监听套接字连接的函数指针指定到ngx_event_accept上 如果是已经连入,发送数据到这里,则这里执行的应该是 CSocekt::ngx_read_request_handler()
注意我这个ngx_epoll_process_events中epoll_wait相当于事件收集器,各个事件对应的处理函数都属于事件处理器,用来消费事件。因此每个处理函数不能够被阻塞,而且应该尽快执行完成 ,否则整个for死循环中的ngx_epoll_process_events卡住了,下一次epoll_wait函数积累的事件越来越多整个程序就会崩盘了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 int CSocekt::ngx_epoll_process_events (int timer) { int events = epoll_wait (m_epollhandle,m_events,NGX_MAX_EVENTS,timer); if (events == -1 ) { if (errno == EINTR) { ngx_log_error_core (NGX_LOG_INFO,errno,"CSocekt::ngx_epoll_process_events()中epoll_wait()失败!" ); return 1 ; } else { ngx_log_error_core (NGX_LOG_ALERT,errno,"CSocekt::ngx_epoll_process_events()中epoll_wait()失败!" ); return 0 ; } } if (events == 0 ) { if (timer != -1 ) { return 1 ; } ngx_log_error_core (NGX_LOG_ALERT,0 ,"CSocekt::ngx_epoll_process_events()中epoll_wait()没超时却没返回任何事件!" ); return 0 ; } ngx_log_stderr (0 ,"惊群测试:events=%d,进程id=%d" ,events,ngx_pid); lpngx_connection_t p_Conn uint32_t revents; for (int i = 0 ; i < events; ++i) { p_Conn = (lpngx_connection_t )(m_events[i].data.ptr); revents = m_events[i].events; if (revents & EPOLLIN) { (this ->* (p_Conn->rhandler) )(p_Conn); } if (revents & EPOLLOUT) { if (revents & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { --p_Conn->iThrowsendCount; } else { (this ->* (p_Conn->whandler) )(p_Conn); } } } return 1 ; }
处理三次握手连入事件ngx_event_accept 三次握手进来了,触发了epoll的读事件,前来调用此函数。accept是从已完成连接established队列取出该socket。
1.用accept4或者accept返回得到socket注意设置成非阻塞。如果设置非阻塞失败那么必须要回收连接池的连接并且关闭socket。
2.给新连接分配一个连接池内的连接ngx_get_connection。
3.连接池内的连接设置这个连接的处理函数 newc->rhandler = &CSocekt::ngx_read_request_handler; //设置已建立连接的socket当客户端发来数据来时的读处理函数 newc->whandler = &CSocekt::ngx_write_request_handler; //设置已建立连接的socket的写处理函数
4.客户端应该主动发送第一次的数据,这里将读事件加入epoll监控ngx_epoll_oper_event
,这样当客户端发送数据来时,ngx_read_request_handler()被ngx_epoll_process_events()调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 void CSocekt::ngx_event_accept (lpngx_connection_t oldc) { struct sockaddr mysockaddr ; socklen_t socklen; int err; int level; int s; static int use_accept4 = 1 ; lpngx_connection_t newc; socklen = sizeof (mysockaddr); do { if (use_accept4) { s = accept4 (oldc->fd, &mysockaddr, &socklen, SOCK_NONBLOCK); } else { s = accept (oldc->fd, &mysockaddr, &socklen); } if (s == -1 ) { err = errno; if (err == EAGAIN) { return ; } level = NGX_LOG_ALERT; if (err == ECONNABORTED) { level = NGX_LOG_ERR; } else if (err == EMFILE || err == ENFILE) { level = NGX_LOG_CRIT; } if (use_accept4 && err == ENOSYS) { use_accept4 = 0 ; continue ; } if (err == ECONNABORTED) { } if (err == EMFILE || err == ENFILE) { } return ; } if (m_onlineUserCount >= m_worker_connections) { close (s); return ; } if (m_connectionList.size () > (m_worker_connections * 5 )) { if (m_freeconnectionList.size () < m_worker_connections) { close (s); return ; } } newc = ngx_get_connection (s); if (newc == NULL ) { if (close (s) == -1 ) { ngx_log_error_core (NGX_LOG_ALERT,errno,"CSocekt::ngx_event_accept()中close(%d)失败!" ,s); } return ; } memcpy (&newc->s_sockaddr,&mysockaddr,socklen); if (!use_accept4) { if (setnonblocking (s) == false ) { ngx_close_connection (newc); return ; } } newc->listening = oldc->listening; newc->rhandler = &CSocekt::ngx_read_request_handler; newc->whandler = &CSocekt::ngx_write_request_handler; if (ngx_epoll_oper_event ( s, EPOLL_CTL_ADD, EPOLLIN|EPOLLRDHUP, 0 , newc ) == -1 ) { ngx_close_connection (newc); return ; } if (m_ifkickTimeCount == 1 ) { AddToTimerQueue (newc); } ++m_onlineUserCount; break ; } while (1 ); return ; }
处理TCP连接客户端发来的数据ngx_read_request_handler 引入一个消息头【结构】STRUC_MSG_HEADER
,用来记录一些额外信息,可以用于处理过时包 服务器 收包时, 收到: 包头+包体 ,我再额外附加一个消息头 ===》 消息头 + 包头 + 包体
1 2 3 4 5 6 7 typedef struct _STRUC_MSG_HEADER { lpngx_connection_t pConn; uint64_t iCurrsequence; }STRUC_MSG_HEADER,*LPSTRUC_MSG_HEADER;
对于连接池的每个连接都是有一个序号iCurrsequence,连接初始化的时候++iCurrsequence,连接释放的时候++iCurrsequence,因此当收到一个包的时候,将连接连接池的序号iCurrsequence记录在包的消息头中,当处理完这个包后想要发回给客户端的时候可以比较一下包的iCurrsequence与连接池的iCurrsequence是否一致,如果不一致说明连接已经断开了,丢弃即可。
因此我们需要new一块新内存专门用来存取消息头 + 包头 + 包体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 void CSocekt::ngx_read_request_handler (lpngx_connection_t pConn) { bool isflood = false ; ssize_t reco = recvproc (pConn,pConn->precvbuf,pConn->irecvlen); if (reco <= 0 ) { return ; } if (pConn->curStat == _PKG_HD_INIT) { if (reco == m_iLenPkgHeader) { ngx_wait_request_handler_proc_p1 (pConn,isflood); } else { pConn->curStat = _PKG_HD_RECVING; pConn->precvbuf = pConn->precvbuf + reco; pConn->irecvlen = pConn->irecvlen - reco; } } else if (pConn->curStat == _PKG_HD_RECVING) { if (pConn->irecvlen == reco) { ngx_wait_request_handler_proc_p1 (pConn,isflood); } else { pConn->precvbuf = pConn->precvbuf + reco; pConn->irecvlen = pConn->irecvlen - reco; } } else if (pConn->curStat == _PKG_BD_INIT) { if (reco == pConn->irecvlen) { if (m_floodAkEnable == 1 ) { isflood = TestFlood (pConn); } ngx_wait_request_handler_proc_plast (pConn,isflood); } else { pConn->curStat = _PKG_BD_RECVING; pConn->precvbuf = pConn->precvbuf + reco; pConn->irecvlen = pConn->irecvlen - reco; } } else if (pConn->curStat == _PKG_BD_RECVING) { if (pConn->irecvlen == reco) { if (m_floodAkEnable == 1 ) { isflood = TestFlood (pConn); } ngx_wait_request_handler_proc_plast (pConn,isflood); } else { pConn->precvbuf = pConn->precvbuf + reco; pConn->irecvlen = pConn->irecvlen - reco; } } if (isflood == true ) { zdClosesocketProc (pConn); } return ; }
注意这个函数调用了recvproc(pConn,pConn->precvbuf,pConn->irecvlen);
最多只能收pConn->irecvlen个字节
注意采用了状态机,非常的稳健!!
recvproc 四次挥手关闭连接,RST强制关闭连接,正常发包都能检测。
伪代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 recvproc(lpngx_connection_t pConn,char *buff,ssize_t buflen) n = recv(c->fd,buff,buflen,0 ); 对返回值n判断,如果n的值异常,根据errno写相应日志或关闭socket,然后直接return -1 ; if n==0 zdClosesocketProc(pConn);return -1 ; if n<0 if (errno == EAGAIN || errno == EWOULDBLOCK)ngx_log_stderr;return -1 ; if (errno == EINTR)ngx_log_stderr;return -1 ; if (errno == ECONNRESET)do nothing else ngx_log_stderr; zdClosesocketProc(c); return -1 ; n>0 ,返回实际收到的字节数
如果recv()有问题,这个函数会把该释放的释放好,该处理的处理好
特别注意: 所有从10行开始走下来的错误,都认为异常:意味着我们要关闭客户端套接字要回收连接池中连接;
一定要有严密的判断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 ssize_t CSocekt::recvproc (lpngx_connection_t pConn,char *buff,ssize_t buflen) { ssize_t n; n = recv (pConn->fd, buff, buflen, 0 ); if (n == 0 ) { zdClosesocketProc (pConn); return -1 ; } if (n < 0 ) { if (errno == EAGAIN || errno == EWOULDBLOCK) { ngx_log_stderr (errno,"CSocekt::recvproc()中errno == EAGAIN || errno == EWOULDBLOCK成立,出乎我意料!" ); return -1 ; } if (errno == EINTR) { ngx_log_stderr (errno,"CSocekt::recvproc()中errno == EINTR成立,出乎我意料!" ); return -1 ; } if (errno == ECONNRESET) { } else { if (errno == EBADF) { } else { ngx_log_stderr (errno,"CSocekt::recvproc()中发生错误,我打印出来看看是啥错误!" ); } } zdClosesocketProc (pConn); return -1 ; } return n; }
ngx_wait_request_handler_proc_p1 包头收完整后的处理,我们称为包处理阶段1
1.判断包是否合法,若不合法则状态机恢复为_PKG_HD_INIT,并且连接池的缓冲区头指针重新指定为最开始的
2.那么char *pTmpBuffer = (char *)p_memory->AllocMemory(m_iLenMsgHeader + e_pkgLen,false);
新分配一段内存 并且将连接池的成员指针pConn->precvMemPointer = pTmpBuffer;
指向这块内存(消息头加整个包长度的内存 )
3.填写消息头的内容,把连接池中连接序号记录到消息头里来,连接池的连接指针也记录到消息头里
4.再填写包头内容,把包体内容拷贝到pTmpBuffer中
5.如果该报文只有包头无包体,调用ngx_wait_request_handler_proc_plast()收完整包后的处理函数。
6.如果该报文还需要继续收包体,修改状态机为_PKG_BD_INIT,并且连接池的缓冲区头指针指向包体。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 void CSocekt::ngx_wait_request_handler_proc_p1 (lpngx_connection_t pConn,bool &isflood) { CMemory *p_memory = CMemory::GetInstance (); LPCOMM_PKG_HEADER pPkgHeader; pPkgHeader = (LPCOMM_PKG_HEADER)pConn->dataHeadInfo; unsigned short e_pkgLen; e_pkgLen = ntohs (pPkgHeader->pkgLen); if (e_pkgLen < m_iLenPkgHeader) { pConn->curStat = _PKG_HD_INIT; pConn->precvbuf = pConn->dataHeadInfo; pConn->irecvlen = m_iLenPkgHeader; } else if (e_pkgLen > (_PKG_MAX_LENGTH-1000 )) { pConn->curStat = _PKG_HD_INIT; pConn->precvbuf = pConn->dataHeadInfo; pConn->irecvlen = m_iLenPkgHeader; } else { char *pTmpBuffer = (char *)p_memory->AllocMemory (m_iLenMsgHeader + e_pkgLen,false ); pConn->precvMemPointer = pTmpBuffer; LPSTRUC_MSG_HEADER ptmpMsgHeader = (LPSTRUC_MSG_HEADER)pTmpBuffer; ptmpMsgHeader->pConn = pConn; ptmpMsgHeader->iCurrsequence = pConn->iCurrsequence; pTmpBuffer += m_iLenMsgHeader; memcpy (pTmpBuffer,pPkgHeader,m_iLenPkgHeader); if (e_pkgLen == m_iLenPkgHeader) { if (m_floodAkEnable == 1 ) { isflood = TestFlood (pConn); } ngx_wait_request_handler_proc_plast (pConn,isflood); } else { pConn->curStat = _PKG_BD_INIT; pConn->precvbuf = pTmpBuffer + m_iLenPkgHeader; pConn->irecvlen = e_pkgLen - m_iLenPkgHeader; } } return ; }
ngx_wait_request_handler_proc_plast 如果包没问题那么就加上消息头加入收包消息队列CThreadPool::m_MsgRecvq1ueue 并触发线程处理消息,注意进入 并且要将当前连接的状态机 恢复为_PKG_HD_INIT,并且连接池的缓冲区头指针重新指定为最开始的,且将precvMemPointer指针置为NULL(原先指向消息头加整个包的内存)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void CSocekt::ngx_wait_request_handler_proc_plast (lpngx_connection_t pConn,bool &isflood) { if (isflood == false ) { g_threadpool.inMsgRecvQueueAndSignal (pConn->precvMemPointer); } else { CMemory *p_memory = CMemory::GetInstance (); p_memory->FreeMemory (pConn->precvMemPointer); } pConn->precvMemPointer = NULL ; pConn->curStat = _PKG_HD_INIT; pConn->precvbuf = pConn->dataHeadInfo; pConn->irecvlen = m_iLenPkgHeader; return ; }
inMsgRecvQueueAndSignal 注意传入的参数是消息头+包头+包体,这是另外new出来的一块内存
将这一个完整消息放入线程池中的线程 的收包的消息队列 里去
并且调用Call激发线程来干活
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 void CThreadPool::inMsgRecvQueueAndSignal (char *buf) { int err = pthread_mutex_lock (&m_pthreadMutex); if (err != 0 ) { ngx_log_stderr (err,"CThreadPool::inMsgRecvQueueAndSignal()pthread_mutex_lock()失败,返回的错误码为%d!" ,err); } m_MsgRecvQueue.push_back (buf); ++m_iRecvMsgQueueCount; err = pthread_mutex_unlock (&m_pthreadMutex); if (err != 0 ) { ngx_log_stderr (err,"CThreadPool::inMsgRecvQueueAndSignal()pthread_mutex_unlock()失败,返回的错误码为%d!" ,err); } Call (); return ; }
Call pthread_cond_signal唤醒一个等待该条件的线程,也就是可以唤醒卡在pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex)
的线程
1 2 3 4 5 while ( (pThreadPoolObj->m_MsgRecvQueue.size () == 0 ) && m_shutdown == false ) { if (pThread->ifrunning == false ) pThread->ifrunning = true ; pthread_cond_wait (&m_pthreadCond, &m_pthreadMutex); }
如果拿得到消息即m_MsgRecvQueue.size()
大小不为0,那么其中1个线程就可以拿到锁并且退出while循环,退出while循环去取消息并且调用相应的消息处理函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void CThreadPool::Call () { int err = pthread_cond_signal (&m_pthreadCond); if (err != 0 ) { ngx_log_stderr (err,"CThreadPool::Call()中pthread_cond_signal()失败,返回的错误码为%d!" ,err); } if (m_iThreadNum == m_iRunningThreadNum) { time_t currtime = time (NULL ); if (currtime - m_iLastEmgTime > 10 ) { m_iLastEmgTime = currtime; ngx_log_stderr (0 ,"CThreadPool::Call()中发现线程池中当前空闲线程数量为0,要考虑扩容线程池了!" ); } } return ; }
处理TCP连接发送数据 什么叫socekt可写? 每一个tcp连接(socket),都会有一个接收缓冲区 和 一个发送缓冲; 发送缓冲区缺省大小一般10几k,接收缓冲区大概几十k,setsocketopt()来设置;
send()【Windows端】,write()【linux】发送数据时,实际上这两个函数是把数据放到了发送缓冲区,之后这两个函数返回了;【这两个函数执行成功不代表客户端已经成功收到数据,只有在客户端用了recv()或read()收到数据后,告诉服务器已经确认收到了数据,这样服务器才会把发送缓冲区中这些数据清空】
如果服务器端的发送 缓冲区满了,那么服务器再调用send(),write()发送数据的时候,那么send(),write()函数就会返回一个EAGAIN;;EAGAIN不是一个错误,只是示意发送缓冲区已经满了,迟一些再调用send(),write()来发送数据吧;
针对 当socket可写的时候【发送缓冲区没满】,会不停的触发socket可写事件 ,我们提出两种解决方案【重要】;
两种解决方案,来自网络,意义在于我们可以通过这种解决方案来指导我们写代码;
需要向socket写数据的时候把socket写事件通知加入到epoll中,等待可写事件,当可写事件来时操作系统会通知咱们;此时咱们可以调用wirte/send函数发送数据,当发送数据完毕后,把socket的写事件通知从红黑树中移除; 缺点:即使发送很少的数据,也需要把事件通知加入到epoll,写完毕后,有需要把写事件通知从红黑树干掉,对效率有一定的影响【有一定的操作代价】
开始不把socket写事件通知加入到epoll,当我需要写数据的时候,直接调用write/send发送数据 ,如果能全部发送完那么最好;
但如果发送缓冲区满了send()返回了EAGIN 无法一次性全部发送完,此时,我再把写事件通知加入到epoll,此时,就变成了在epoll驱动下写数据
当发送缓冲区可写(即有空间了)的时候,ngx_epoll_process_events函数中epoll_wait会通知有可写事件,这个时候我们调用ngx_write_request_handler来处理可写事件,当全部数据发送完毕后,再把写事件通知从epoll中干掉;
优点:数据不多的时候,可以避免epoll的写事件的增加/删除,提高了程序的执行效率;
msgSend待发送消息入到发消息队列 主要的逻辑:
1.CSocket::m_MsgSendQueue.push_back(psendbuf);将消息放入待发包消息队列 中
2.sem_post(&m_semEventSendQueue)让ServerSendQueueThread()流程走下来干活
同步原理:
CSocekt::Initialize_subproc
函数中初始化调用sem_init(&m_semEventSendQueue,0,0)
对此信号量进行初始化为0。
sem_wait():测试指定信号量的值,如果该值>0,那么将该值减1 然后该函数立即返回;如果该值 等于0,那么该线程将投入睡眠中,一直到该值>0,这个时候 那么将该值减1 然后该函数立即返回;
semd_post():能够将指定信号量值加1 ,即便当前没有其他线程在等待该信号量值也没关系;这也就保证了信号量不会丢失
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 void CSocekt::msgSend (char *psendbuf) { CMemory *p_memory = CMemory::GetInstance (); CLock lock (&m_sendMessageQueueMutex) ; if (m_iSendMsgQueueCount > 50000 ) { m_iDiscardSendPkgCount++; p_memory->FreeMemory (psendbuf); return ; } LPSTRUC_MSG_HEADER pMsgHeader = (LPSTRUC_MSG_HEADER)psendbuf; lpngx_connection_t p_Conn = pMsgHeader->pConn; if (p_Conn->iSendCount > 400 ) { ngx_log_stderr (0 ,"CSocekt::msgSend()中发现某用户%d积压了大量待发送数据包,切断与他的连接!" ,p_Conn->fd); m_iDiscardSendPkgCount++; p_memory->FreeMemory (psendbuf); zdClosesocketProc (p_Conn); return ; } ++p_Conn->iSendCount; m_MsgSendQueue.push_back (psendbuf); ++m_iSendMsgQueueCount; if (sem_post (&m_semEventSendQueue)==-1 ) { ngx_log_stderr (0 ,"CSocekt::msgSend()中sem_post(&m_semEventSendQueue)失败." ); } return ; }
ServerSendQueueThread处理发送消息队列的线程* pthread_mutex_lock(&pSocketObj->m_sendMessageQueueMutex); //因为我们要操作发送消息对列
发送消息,如果发送缓冲区满了,则需要通过epoll事件来驱动消息的继续发送,所以如果发送缓冲区满,则用这个连接池成员变量ngx_connection_s::atomic<int>iThrowsendCount;
标记是依靠epoll来驱动的
1.sendsize>0且成功发送了整个包的所有数据,一下就发送出去这很顺利,那么把堆里面的存放消息的那块内存释放掉即可
2.sendsize>0但是没有全部发送完毕(EAGAIN),数据只发出去了一部分,但肯定是因为 发送缓冲区满了,EPOLL_MOD给当前连接增加一个监听可写事件
3.sendsize == 0对方断开了,对方断开这个事件属于可读事件,会再recvproc函数中处理
4.sendsize == -1表明 errno == EAGAIN ,服务器的发送缓冲区满了,那么和第2种清空一样,需要通过EPOLL_MOD给当前连接增加一个监听可写事件
5.sendsize == -2,一般我认为都是对端断开的错误,对端断开需要把堆里面的存放消息的那块内存释放掉即可,操作系统会帮我们将这个socket连接从红黑树中移除掉
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 void * CSocekt::ServerSendQueueThread (void * threadData) { ThreadItem *pThread = static_cast <ThreadItem*>(threadData); CSocekt *pSocketObj = pThread->_pThis; int err; std::list <char *>::iterator pos,pos2,posend; char *pMsgBuf; LPSTRUC_MSG_HEADER pMsgHeader; LPCOMM_PKG_HEADER pPkgHeader; lpngx_connection_t p_Conn; unsigned short itmp; ssize_t sendsize; CMemory *p_memory = CMemory::GetInstance (); while (g_stopEvent == 0 ) { if (sem_wait (&pSocketObj->m_semEventSendQueue) == -1 ) { if (errno != EINTR) ngx_log_stderr (errno,"CSocekt::ServerSendQueueThread()中sem_wait(&pSocketObj->m_semEventSendQueue)失败." ); } if (g_stopEvent != 0 ) break ; if (pSocketObj->m_iSendMsgQueueCount > 0 ) { err = pthread_mutex_lock (&pSocketObj->m_sendMessageQueueMutex); if (err != 0 ) ngx_log_stderr (err,"CSocekt::ServerSendQueueThread()中pthread_mutex_lock()失败,返回的错误码为%d!" ,err); pos = pSocketObj->m_MsgSendQueue.begin (); posend = pSocketObj->m_MsgSendQueue.end (); while (pos != posend) { pMsgBuf = (*pos); pMsgHeader = (LPSTRUC_MSG_HEADER)pMsgBuf; pPkgHeader = (LPCOMM_PKG_HEADER)(pMsgBuf+pSocketObj->m_iLenMsgHeader); p_Conn = pMsgHeader->pConn; if (p_Conn->iCurrsequence != pMsgHeader->iCurrsequence) { pos2=pos; pos++; pSocketObj->m_MsgSendQueue.erase (pos2); --pSocketObj->m_iSendMsgQueueCount; p_memory->FreeMemory (pMsgBuf); continue ; } if (p_Conn->iThrowsendCount > 0 ) { pos++; continue ; } --p_Conn->iSendCount; p_Conn->psendMemPointer = pMsgBuf; pos2=pos; pos++; pSocketObj->m_MsgSendQueue.erase (pos2); --pSocketObj->m_iSendMsgQueueCount; p_Conn->psendbuf = (char *)pPkgHeader; itmp = ntohs (pPkgHeader->pkgLen); p_Conn->isendlen = itmp; sendsize = pSocketObj->sendproc (p_Conn,p_Conn->psendbuf,p_Conn->isendlen); if (sendsize > 0 ) { if (sendsize == p_Conn->isendlen) { p_memory->FreeMemory (p_Conn->psendMemPointer); p_Conn->psendMemPointer = NULL ; p_Conn->iThrowsendCount = 0 ; } else { p_Conn->psendbuf = p_Conn->psendbuf + sendsize; p_Conn->isendlen = p_Conn->isendlen - sendsize; ++p_Conn->iThrowsendCount; if (pSocketObj->ngx_epoll_oper_event ( p_Conn->fd, EPOLL_CTL_MOD, EPOLLOUT, 0 , p_Conn ) == -1 ) { ngx_log_stderr (errno,"CSocekt::ServerSendQueueThread()ngx_epoll_oper_event()失败." ); } } continue ; } else if (sendsize == 0 ) { p_memory->FreeMemory (p_Conn->psendMemPointer); p_Conn->psendMemPointer = NULL ; p_Conn->iThrowsendCount = 0 ; continue ; } else if (sendsize == -1 ) { ++p_Conn->iThrowsendCount; if (pSocketObj->ngx_epoll_oper_event ( p_Conn->fd, EPOLL_CTL_MOD, EPOLLOUT, 0 , p_Conn ) == -1 ) { ngx_log_stderr (errno,"CSocekt::ServerSendQueueThread()中ngx_epoll_add_event()_2失败." ); } continue ; } else { p_memory->FreeMemory (p_Conn->psendMemPointer); p_Conn->psendMemPointer = NULL ; p_Conn->iThrowsendCount = 0 ; continue ; } } err = pthread_mutex_unlock (&pSocketObj->m_sendMessageQueueMutex); if (err != 0 ) ngx_log_stderr (err,"CSocekt::ServerSendQueueThread()pthread_mutex_unlock()失败,返回的错误码为%d!" ,err); } } return (void *)0 ; }
sendproc发送数据专用函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 ssize_t CSocekt::sendproc (lpngx_connection_t c,char *buff,ssize_t size) { ssize_t n; for ( ;; ) { n = send (c->fd, buff, size, 0 ); if (n > 0 ) { return n; } if (n == 0 ) { return 0 ; } if (errno == EAGAIN) { return -1 ; } if (errno == EINTR) { ngx_log_stderr (errno,"CSocekt::sendproc()中send()失败." ); } else { return -2 ; } } }
ngx_write_request_handler(epoll通知后就调用这个函数)* 调用sendproc(pConn,pConn->psendbuf,pConn->isendlen);发送数据
1.数据只发送了一部分,return返回 ,之后如果发送缓冲区可写epoll还会通知
2.成功的发送完了所有的数据,就用EPOLL_MOD参数将写事件通知给去掉,epoll将不会继续监听写事件
3.数据发送完毕那么就可以sem_post(&m_semEventSendQueue)试着通知ServerSendQueueThread可以继续发送数据了
4.最后清空堆中存放消息的那块内存return返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 void CSocekt::ngx_write_request_handler (lpngx_connection_t pConn) { CMemory *p_memory = CMemory::GetInstance (); ssize_t sendsize = sendproc (pConn,pConn->psendbuf,pConn->isendlen); if (sendsize > 0 && sendsize != pConn->isendlen) { pConn->psendbuf = pConn->psendbuf + sendsize; pConn->isendlen = pConn->isendlen - sendsize; return ; } else if (sendsize == -1 ) { ngx_log_stderr (errno,"CSocekt::ngx_write_request_handler()时if(sendsize == -1)成立,这很怪异。" ); return ; } if (sendsize > 0 && sendsize == pConn->isendlen) { if (ngx_epoll_oper_event ( pConn->fd, EPOLL_CTL_MOD, EPOLLOUT, 1 , pConn ) == -1 ) { ngx_log_stderr (errno,"CSocekt::ngx_write_request_handler()中ngx_epoll_oper_event()失败。" ); } } if (sem_post (&m_semEventSendQueue)==-1 ) ngx_log_stderr (0 ,"CSocekt::ngx_write_request_handler()中sem_post(&m_semEventSendQueue)失败." ); p_memory->FreeMemory (pConn->psendMemPointer); pConn->psendMemPointer = NULL ; --pConn->iThrowsendCount; return ; }
连接池 该结构表示一个TCP连接【客户端主动发起的、Nginx服务器被动接受的TCP连接】,主要包含:
套接字fd
连接序号
读写事件的处理函数
收包缓冲区和收包状态机
发包缓冲区
连接池类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 struct ngx_connection_s { ngx_connection_s (); virtual ~ngx_connection_s (); void GetOneToUse () ; void PutOneToFree () ; int fd; lpngx_listening_t listening; uint64_t iCurrsequence; struct sockaddr s_sockaddr ; ngx_event_handler_pt rhandler; ngx_event_handler_pt whandler; uint32_t events; unsigned char curStat; char dataHeadInfo[_DATA_BUFSIZE_]; char *precvbuf; unsigned int irecvlen; char *precvMemPointer; pthread_mutex_t logicPorcMutex; std::atomic<int > iThrowsendCount; char *psendMemPointer; char *psendbuf; unsigned int isendlen; time_t inRecyTime; time_t lastPingTime; uint64_t FloodkickLastTime; int FloodAttackCount; std::atomic<int > iSendCount; lpngx_connection_t next; };
initconnection初始化连接池 连接池的所有连接都放入m_connectionList,空闲连接都放入m_freeconnectionList,以后要取空闲连接就从空闲连接中取得。注意两个存储连接的链表类型是list,存储的是指针,指向具体连接池某一连接的内存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void CSocekt::initconnection () { lpngx_connection_t p_Conn; CMemory *p_memory = CMemory::GetInstance (); int ilenconnpool = sizeof (ngx_connection_t ); for (int i = 0 ; i < m_worker_connections; ++i) { p_Conn = (lpngx_connection_t )p_memory->AllocMemory (ilenconnpool,true ); p_Conn = new (p_Conn) ngx_connection_t (); p_Conn->GetOneToUse (); m_connectionList.push_back (p_Conn); m_freeconnectionList.push_back (p_Conn); } m_free_connection_n = m_total_connection_n = m_connectionList.size (); return ; }
GetOneToUse()连接初始化 每次初始化connection都会++iCurrsequence;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void ngx_connection_s::GetOneToUse () { ++iCurrsequence; fd = -1 ; curStat = _PKG_HD_INIT; precvbuf = dataHeadInfo; irecvlen = sizeof (COMM_PKG_HEADER); precvMemPointer = NULL ; iThrowsendCount = 0 ; psendMemPointer = NULL ; events = 0 ; lastPingTime = time (NULL ); FloodkickLastTime = 0 ; FloodAttackCount = 0 ; iSendCount = 0 ; }
ngx_get_connection 多线程操纵链表CLock lock(&m_connectionMutex); 因此需要加锁
从空闲队列取出连接池的连接并且调用GetOneToUse初始化连接,再绑定当前socket的fd。返回连接return p_Conn
没有空闲连接则创建一个新的连接并且要放入总表队列调用GetOneToUse初始化连接,再绑定当前socket的fd。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 lpngx_connection_t CSocekt::ngx_get_connection (int isock) { CLock lock (&m_connectionMutex) ; if (!m_freeconnectionList.empty ()) { lpngx_connection_t p_Conn = m_freeconnectionList.front (); m_freeconnectionList.pop_front (); p_Conn->GetOneToUse (); --m_free_connection_n; p_Conn->fd = isock; return p_Conn; } CMemory *p_memory = CMemory::GetInstance (); lpngx_connection_t p_Conn = (lpngx_connection_t )p_memory->AllocMemory (sizeof (ngx_connection_t ),true ); p_Conn = new (p_Conn) ngx_connection_t (); p_Conn->GetOneToUse (); m_connectionList.push_back (p_Conn); ++m_total_connection_n; p_Conn->fd = isock; return p_Conn; }
clearconnection 清空整个连接池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void CSocekt::clearconnection () { lpngx_connection_t p_Conn; CMemory *p_memory = CMemory::GetInstance (); while (!m_connectionList.empty ()) { p_Conn = m_connectionList.front (); m_connectionList.pop_front (); p_Conn->~ngx_connection_t (); p_memory->FreeMemory (p_Conn); } }
ngx_free_connection()立即回收 用户没有三次握手接入之前我们可以直接立即回收
PutOneToFree中会将此连接的序列号iCurrsequence自加1,以避免过期包的发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void CSocekt::ngx_free_connection (lpngx_connection_t pConn) { CLock lock (&m_connectionMutex) ; pConn->PutOneToFree (); m_freeconnectionList.push_back (pConn); ++m_free_connection_n; return ; }
PutOneToFree 收到的包不全并且用户退出了,有必要将收到一半的包的内存释放掉
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void ngx_connection_s::PutOneToFree () { ++iCurrsequence; if (precvMemPointer != NULL ) { CMemory::GetInstance ()->FreeMemory (precvMemPointer); precvMemPointer = NULL ; } if (psendMemPointer != NULL ) { CMemory::GetInstance ()->FreeMemory (psendMemPointer); psendMemPointer = NULL ; } iThrowsendCount = 0 ; }
inRecyConnectQueue延时回收 用户三次握手进来了,但是断了,还是采用延时回收吧,延时回收也会将连接的序列号pConn->iCurrsequence自加1,以避免过期包的发送
放入CSocekt::list<lpngx_connection_t>m_recyconnectionList
然后ServerRecyConnectionThread线程自会处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void CSocekt::inRecyConnectQueue (lpngx_connection_t pConn) { std::list<lpngx_connection_t >::iterator pos; bool iffind = false ; CLock lock (&m_recyconnqueueMutex) ; for (pos = m_recyconnectionList.begin (); pos != m_recyconnectionList.end (); ++pos) { if ((*pos) == pConn) { iffind = true ; break ; } } if (iffind == true ) { return ; } pConn->inRecyTime = time (NULL ); ++pConn->iCurrsequence; m_recyconnectionList.push_back (pConn); ++m_totol_recyconnection_n; --m_onlineUserCount; return ; }
ServerRecyConnectionThread处理连接回收的线程 伪代码
1 2 3 4 5 6 7 8 9 10 11 12 13 void * CSocekt::ServerRecyConnectionThread (void * threadData) while (1 ) usleep (200 * 1000 ) ; pthread_mutex_lock (&pSocketObj->m_recyconnqueueMutex); for (; pos != posend; ++pos) if (进入回收站时间+等待回收时间>当前时间)continue ; m_recyconnectionList.erase (pos); ngx_free_connection (p_Conn); pthread_mutex_unlock (&pSocketObj->m_recyconnqueueMutex); if (g_stopEvent == 1 ) 做上面的相同行为而且不加时间判断,对所有连接全回收 endwhile (1 )
CSocket的静态成员函数,与线程池无关
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 void * CSocekt::ServerRecyConnectionThread (void * threadData) { ThreadItem *pThread = static_cast <ThreadItem*>(threadData); CSocekt *pSocketObj = pThread->_pThis; time_t currtime; int err; std::list<lpngx_connection_t >::iterator pos,posend; lpngx_connection_t p_Conn; while (1 ) { usleep (200 * 1000 ); if (pSocketObj->m_totol_recyconnection_n > 0 ) { currtime = time (NULL ); err = pthread_mutex_lock (&pSocketObj->m_recyconnqueueMutex); if (err != 0 ) ngx_log_stderr (err,"CSocekt::ServerRecyConnectionThread()中pthread_mutex_lock()失败,返回的错误码为%d!" ,err); lblRRTD: pos = pSocketObj->m_recyconnectionList.begin (); posend = pSocketObj->m_recyconnectionList.end (); for (; pos != posend; ++pos) { p_Conn = (*pos); if ( ( (p_Conn->inRecyTime + pSocketObj->m_RecyConnectionWaitTime) > currtime) && (g_stopEvent == 0 ) ) { continue ; } if (p_Conn->iThrowsendCount > 0 ) { ngx_log_stderr (0 ,"CSocekt::ServerRecyConnectionThread()中到释放时间却发现p_Conn.iThrowsendCount!=0,这个不该发生" ); } --pSocketObj->m_totol_recyconnection_n; pSocketObj->m_recyconnectionList.erase (pos); pSocketObj->ngx_free_connection (p_Conn); goto lblRRTD; } err = pthread_mutex_unlock (&pSocketObj->m_recyconnqueueMutex); if (err != 0 ) ngx_log_stderr (err,"CSocekt::ServerRecyConnectionThread()pthread_mutex_unlock()失败,返回的错误码为%d!" ,err); } if (g_stopEvent == 1 ) { if (pSocketObj->m_totol_recyconnection_n > 0 ) { err = pthread_mutex_lock (&pSocketObj->m_recyconnqueueMutex); if (err != 0 ) ngx_log_stderr (err,"CSocekt::ServerRecyConnectionThread()中pthread_mutex_lock2()失败,返回的错误码为%d!" ,err); lblRRTD2: pos = pSocketObj->m_recyconnectionList.begin (); posend = pSocketObj->m_recyconnectionList.end (); for (; pos != posend; ++pos) { p_Conn = (*pos); --pSocketObj->m_totol_recyconnection_n; pSocketObj->m_recyconnectionList.erase (pos); pSocketObj->ngx_free_connection (p_Conn); goto lblRRTD2; } err = pthread_mutex_unlock (&pSocketObj->m_recyconnqueueMutex); if (err != 0 ) ngx_log_stderr (err,"CSocekt::ServerRecyConnectionThread()pthread_mutex_unlock2()失败,返回的错误码为%d!" ,err); } break ; } } return (void *)0 ; }
线程池 线程类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 class CThreadPool { public : CThreadPool(); ~CThreadPool(); public : bool Create (int threadNum) ; void StopAll () ; void inMsgRecvQueueAndSignal (char *buf) ; void Call () ; int getRecvMsgQueueCount () {return m_iRecvMsgQueueCount;} private : static void * ThreadFunc (void *threadData) ; void clearMsgRecvQueue () ; private : struct ThreadItem { pthread_t _Handle; CThreadPool *_pThis; bool ifrunning; ThreadItem(CThreadPool *pthis):_pThis(pthis),ifrunning(false ){} ~ThreadItem(){} }; private : static pthread_mutex_t m_pthreadMutex; static pthread_cond_t m_pthreadCond; static bool m_shutdown; int m_iThreadNum; std ::atomic<int > m_iRunningThreadNum; time_t m_iLastEmgTime; std ::vector <ThreadItem *> m_threadVector; std ::list <char *> m_MsgRecvQueue; int m_iRecvMsgQueueCount; };
创建线程池(worker进程中执行) Create()会激发线程入口函数ThreadFunc 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 bool CThreadPool::Create (int threadNum) { ThreadItem *pNew; int err; m_iThreadNum = threadNum; for (int i = 0 ; i < m_iThreadNum; ++i) { m_threadVector.push_back (pNew = new ThreadItem (this )); err = pthread_create (&pNew->_Handle, NULL , ThreadFunc, pNew); if (err != 0 ) { ngx_log_stderr (err,"CThreadPool::Create()创建线程%d失败,返回的错误码为%d!" ,i,err); return false ; } else { } } std::vector<ThreadItem*>::iterator iter; lblfor: for (iter = m_threadVector.begin (); iter != m_threadVector.end (); iter++) { if ( (*iter)->ifrunning == false ) { usleep (100 * 1000 ); goto lblfor; } } return true ; }
注意m_threadVector.push_back(pNew = new ThreadItem(this));
这个this指针是线程池CThreadPool的指针,通过这一句指向CthreadPool的指针就传入ThreadItem中去了。
CThreadPool是线程池的管理类,整个服务器只需这一个对象即可
ThreadItem是线程的结构,包含线程句柄,线程各个状态等等,CthreadPool::vector<ThreadItem *> m_threadVector;
就是线程 容器,容器里就是各个线程了
特别注意: 【很重要】
线程池要求都执行阻塞到这一行pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
,在此之前create()函数不允许返回。因为如果不这样的话,先开的线程可能create()函数已经执行完毕了并且开始执行比如StopAll()函数进行修改甚至关闭了线程池资源了,但是所有线程还没有完全启动,这样会导致线程池异常。
所以上面的lblfor循环是为了保证所有线程完全启动起来,以保证整个线程池中的线程正常工作。
ThreadFunc()线程入口函数 精华代码:
1 2 3 4 5 6 err = pthread_mutex_lock (&m_pthreadMutex); while ( (pThreadPoolObj->m_MsgRecvQueue.size () == 0 ) && m_shutdown == false ) { if (pThread->ifrunning == false ) pThread->ifrunning = true ; pthread_cond_wait (&m_pthreadCond, &m_pthreadMutex); }
注意在CThreadPool类中static void* ThreadFunc(void *threadData);
是一个静态函数,不存在this指针,因此临时定义CThreadPool *pThreadPoolObj = pThread->_pThis;
注意m_pthreadCond是一个静态成员static pthread_cond_t m_pthreadCond;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER; //初始化
因此对于m_pthreadCond 而言pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
刚开始时初始状态,没有什么东西来激发它,会卡在这里,而且m_pthreadMutex会被释放掉; 第一个线程执行到这一句的时候,m_pthreadMutex会被释放掉,第二个线程得以在while循环中往下执行。如果有100个线程,最终结果是100个线程都会卡在这里并且m_pthreadMutex会被释放掉。这100个线程都在等待m_pthreadCond这个条件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 void * CThreadPool::ThreadFunc (void * threadData) { ThreadItem *pThread = static_cast <ThreadItem*>(threadData); CThreadPool *pThreadPoolObj = pThread->_pThis; CMemory *p_memory = CMemory::GetInstance (); int err; pthread_t tid = pthread_self (); while (true ) { err = pthread_mutex_lock (&m_pthreadMutex); if (err != 0 ) ngx_log_stderr (err,"CThreadPool::ThreadFunc()中pthread_mutex_lock()失败,返回的错误码为%d!" ,err); while ( (pThreadPoolObj->m_MsgRecvQueue.size () == 0 ) && m_shutdown == false ) { if (pThread->ifrunning == false ) pThread->ifrunning = true ; pthread_cond_wait (&m_pthreadCond, &m_pthreadMutex); } if (m_shutdown) { pthread_mutex_unlock (&m_pthreadMutex); break ; } char *jobbuf = pThreadPoolObj->m_MsgRecvQueue.front (); pThreadPoolObj->m_MsgRecvQueue.pop_front (); --pThreadPoolObj->m_iRecvMsgQueueCount; err = pthread_mutex_unlock (&m_pthreadMutex); if (err != 0 ) ngx_log_stderr (err,"CThreadPool::ThreadFunc()中pthread_mutex_unlock()失败,返回的错误码为%d!" ,err); ++pThreadPoolObj->m_iRunningThreadNum; g_socket.threadRecvProcFunc (jobbuf); p_memory->FreeMemory (jobbuf); --pThreadPoolObj->m_iRunningThreadNum; } return (void *)0 ; }
线程处理消息队列 所有线程都卡在pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
才能初始化线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 typedef bool (CLogicSocket::*handler) ( lpngx_connection_t pConn, LPSTRUC_MSG_HEADER pMsgHeader, char *pPkgBody, unsigned short iBodyLength) ; static const handler statusHandler[] = { &CLogicSocket::_HandlePing, NULL , NULL , NULL , NULL , &CLogicSocket::_HandleRegister, &CLogicSocket::_HandleLogIn, }; #define AUTH_TOTAL_COMMANDS sizeof(statusHandler)/sizeof(handler)
threadRecvProcFunc ThreadFunc所有线程都阻塞在pthread_cond_wait处,当有消息来时有线程取到消息后会调用当前函数
1.用强制类型转换取得消息头和包头两个结构体。从包头中取出包的长度(注意要ntohs网络序转本机序)。
2.如果没有包体只有包头,crc32校验码应该是0,若不为0丢弃包return。
3.如果有包体,拿到包体并且服务器通过包体计算得到的crc32校验码,然后与客户端传来pPkgHeader->crc32校验码比较是否一致,若不一致丢弃且return。
4.然后通过消息头取出连接池的连接指针p_Conn和消息头的iCurrsequence,然后比较连接池的此连接的p_Conn->iCurrsequence与消息头的iCurrsequence是否一致,若不一致说明连接已经关闭了。丢弃包直接return。
1 2 3 4 typedef struct _STRUC_MSG_HEADER { lpngx_connection_t pConn; uint64_t iCurrsequence; }STRUC_MSG_HEADER,*LPSTRUC_MSG_HEADER;
5.判断长度是否大于AUTH_TOTAL_COMMANDS,若大于是恶意包,根本没有这么多命令,丢弃包并且return。
6.(this->*statusHandler[imsgCode])(p_Conn,pMsgHeader,(char *)pPkgBody,pkglen-m_iLenPkgHeader);
根据客户端发来包头内部的消息类型代码(区别每个不同的命令)调用对应的函数实现各个不同消息类型的处理逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 void CLogicSocket::threadRecvProcFunc (char *pMsgBuf) { LPSTRUC_MSG_HEADER pMsgHeader = (LPSTRUC_MSG_HEADER)pMsgBuf; LPCOMM_PKG_HEADER pPkgHeader = (LPCOMM_PKG_HEADER)(pMsgBuf+m_iLenMsgHeader); void *pPkgBody; unsigned short pkglen = ntohs (pPkgHeader->pkgLen); if (m_iLenPkgHeader == pkglen) { if (pPkgHeader->crc32 != 0 ) { return ; } pPkgBody = NULL ; } else { pPkgHeader->crc32 = ntohl (pPkgHeader->crc32); pPkgBody = (void *)(pMsgBuf+m_iLenMsgHeader+m_iLenPkgHeader); int calccrc = CCRC32::GetInstance ()->Get_CRC ((unsigned char *)pPkgBody,pkglen-m_iLenPkgHeader); if (calccrc != pPkgHeader->crc32) { ngx_log_stderr (0 ,"CLogicSocket::threadRecvProcFunc()中CRC错误[服务器:%d/客户端:%d],丢弃数据!" ,calccrc,pPkgHeader->crc32); return ; } else { } } unsigned short imsgCode = ntohs (pPkgHeader->msgCode); lpngx_connection_t p_Conn = pMsgHeader->pConn; if (p_Conn->iCurrsequence != pMsgHeader->iCurrsequence) { return ; } if (imsgCode >= AUTH_TOTAL_COMMANDS) { ngx_log_stderr (0 ,"CLogicSocket::threadRecvProcFunc()中imsgCode=%d消息码不对!" ,imsgCode); return ; } if (statusHandler[imsgCode] == NULL ) { ngx_log_stderr (0 ,"CLogicSocket::threadRecvProcFunc()中imsgCode=%d消息码找不到对应的处理函数!" ,imsgCode); return ; } (this ->*statusHandler[imsgCode])(p_Conn,pMsgHeader,(char *)pPkgBody,pkglen->m_iLenPkgHeader); return ; }
释放线程池 虽然一般不会调用这个函数,而且实在不行直接关闭程序系统帮我们释放资源,但是为了优雅一点自己实现一下。
StopAll() 1 2 3 4 5 6 err = pthread_mutex_lock (&m_pthreadMutex); while ( (pThreadPoolObj->m_MsgRecvQueue.size () == 0 ) && m_shutdown == false ) { if (pThread->ifrunning == false ) pThread->ifrunning = true ; pthread_cond_wait (&m_pthreadCond, &m_pthreadMutex); }
先将所有线程安全退出后,再将内存释放。“优雅干净”
1.首先给个判断m_shutdown避免重复释放,然后m_shutdown置为true表示要关闭线程池了,这是个静态变量static bool CThreadPool::m_shutdown = false;
2.pthread_cond_broadcast广播会激发ThreadPool的静态成员m_pthreadCond,一旦激发成功pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex); 的线程被唤醒了,并且这个while循环条件不满足,所有线程都去拿锁m_pthreadMutex,其他未拿到锁的线程只能卡死,等待上个线程退出释放锁,最终所有线程return退出。
3.for循环pthread_join回收退出的线程资源直至所有线程都退出
4.通过m_threadVector容器中的成员指针将之前new出来的ThreadItem内存释放。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 void CThreadPool::StopAll () { if (m_shutdown == true ) { return ; } m_shutdown = true ; int err = pthread_cond_broadcast(&m_pthreadCond); if (err != 0 ) { ngx_log_stderr(err,"CThreadPool::StopAll()中pthread_cond_broadcast()失败,返回的错误码为%d!" ,err); return ; } std ::vector <ThreadItem*>::iterator iter; for (iter = m_threadVector.begin(); iter != m_threadVector.end(); iter++) { pthread_join((*iter)->_Handle, NULL ); } pthread_mutex_destroy(&m_pthreadMutex); pthread_cond_destroy(&m_pthreadCond); for (iter = m_threadVector.begin(); iter != m_threadVector.end(); iter++) { if (*iter) delete *iter; } m_threadVector.clear(); ngx_log_stderr(0 ,"CThreadPool::StopAll()成功返回,线程池中线程全部正常结束!" ); return ; }
业务逻辑 线程池里面地线程都“嗷嗷待哺”地等待客户端发来消息,线程池地线程都会执行threadRecvProcFunc函数,这个函数会根据发来的消息包的不同执行不同的逻辑函数。目前逻辑并不多,只处理了一个心跳包的逻辑。
心跳包 心跳包其实就是 一个普通的数据包;
一般每个几十秒,最长一般也就是1分钟【10秒-60秒之间】,有客户端主动发送给服务器;服务器收到之后,一般会给客户端返回一个心跳包;
三路握手,tcp连接建立之后,才存在发送心跳包的问题—— 如果c不给s发心跳包,服务器会怎样;约定 30秒发送 一次; 服务器可能会在90秒或者100秒内,主动关闭该客户端的socket连接;
作为一个好的客户端程序,如果你发送了心跳包给服务器,但是在90或者100秒之内,你[客户端]没有收到服务器回应的心跳包,那么你就应该主动关闭与服务器端的链接,并且如果业务需要重连,客户端程序在关闭这个连接后还要重新主动再次尝试连接服务器端;客户端程序 也有必要提示使用者 与服务器的连接已经断开;
为什么引入心跳包?
常规客户端关闭,服务器端能感知到;但是有一种特殊情况,连接断开c/s都感知不到;
c /s程序运行在不同的两个物理电脑上;tcp已经建立; 拔掉c /s程序的网线; 拔掉网线导致服务器感知不到客户端断开 ,这个事实,一定要知道; 为了应对拔网线,导致不知道对方是否断开了tcp连接这种事,这就是我们引入心跳包机制的原因; 超时没有发送来心跳包,那么就会将对端的socket连接close掉,回收资源;这就是心跳包的作用; 其他作用:检测网络延迟等等 这里心跳包主要目的就是检测双方的链接是否断开;
tcp本身keepalive机制;因为检测时间不好控制,所以不适合我们;
因此连接池的每个连接引入一个成员变量lastPingTime记录上一次的ping命令(心跳包)的时间,不断地更新
处理发来的心跳包 _HandlePing 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 bool CLogicSocket::_HandlePing(lpngx_connection_t pConn,LPSTRUC_MSG_HEADER pMsgHeader,char *pPkgBody,unsigned short iBodyLength){ if (iBodyLength != 0 ) return false ; CLock lock (&pConn->logicPorcMutex) ; pConn->lastPingTime = time (NULL ); SendNoBodyPkgToClient (pMsgHeader,_CMD_PING); return true ; }
SendNoBodyPkgToClient 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void CLogicSocket::SendNoBodyPkgToClient (LPSTRUC_MSG_HEADER pMsgHeader,unsigned short iMsgCode) { CMemory *p_memory = CMemory::GetInstance (); char *p_sendbuf = (char *)p_memory->AllocMemory (m_iLenMsgHeader+m_iLenPkgHeader,false ); char *p_tmpbuf = p_sendbuf; memcpy (p_tmpbuf,pMsgHeader,m_iLenMsgHeader); p_tmpbuf += m_iLenMsgHeader; LPCOMM_PKG_HEADER pPkgHeader = (LPCOMM_PKG_HEADER)p_tmpbuf; pPkgHeader->msgCode = htons (iMsgCode); pPkgHeader->pkgLen = htons (m_iLenPkgHeader); pPkgHeader->crc32 = 0 ; msgSend (p_sendbuf); return ; }
检测心跳时间 配置文件 20秒;超过20*3 +10 =70秒,仍旧没收到心跳包,那么服务器端就把tcp断开;或者20秒直接断开TCP连接 增加配置Sock_WaitTimeEnable,Sock_MaxWaitTime
1 2 3 4 5 6 #Sock_WaitTimeEnable:是否开启踢人时钟,1:开启 0:不开启 Sock_WaitTimeEnable = 1 #多少秒检测一次是否 心跳超时,只有当Sock_WaitTimeEnable = 1时,本项才有用 Sock_MaxWaitTime = 20 #当时间到达Sock_MaxWaitTime指定的时间时,直接把客户端踢出去,只有当Sock_WaitTimeEnable = 1时,本项才有用 Sock_TimeOutKick = 0
CSocekt::AddToTimerQueue 在ngx_event_accept (三次握手成功后)调用AddToTimerQueue()添加一个定时器 。每次进来一个用户,就往时间队列multimap(有序的键/值对,但它可以保存重复的元素)增加一个连接。 每次插入时间队列会按键值 自动排序 小->大。并且将时间队列头部时间值(最早时间) 保存到m_timer_value_里std::multimap<time_t, LPSTRUC_MSG_HEADER> m_timerQueuemap;
键:时间,值:消息头(消息头存放连接指针和连接序号)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void CSocekt::AddToTimerQueue (lpngx_connection_t pConn) { CMemory *p_memory = CMemory::GetInstance (); time_t futtime = time (NULL ); futtime += m_iWaitTime; CLock lock (&m_timequeueMutex) ; LPSTRUC_MSG_HEADER tmpMsgHeader = (LPSTRUC_MSG_HEADER)p_memory->AllocMemory (m_iLenMsgHeader,false ); tmpMsgHeader->pConn = pConn; tmpMsgHeader->iCurrsequence = pConn->iCurrsequence; m_timerQueuemap.insert (std::make_pair (futtime,tmpMsgHeader)); m_cur_size_++; m_timer_value_ = GetEarliestTime (); return ; }
CSocekt::ServerSendQueueThread处理时间队列线程 创建一个新线程,专门处理事件队列里心跳包未发送的连接
每次取出m_timer_value_最早时间,判断有没有连接是已经过期的(过久未发心跳包)
通过GetOverTimeTimer根据给的当前时间,从m_timeQueuemap找到比这个时间更老(更早)的节点【1个】返回去,这些节点都是时间超过了,要处理的节点
然后对要处理的时间过期节点,该去检测心跳包是否超时的事宜,是否要踢出这个连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 void * CSocekt::ServerTimerQueueMonitorThread (void * threadData) { ThreadItem *pThread = static_cast <ThreadItem*>(threadData); CSocekt *pSocketObj = pThread->_pThis; time_t absolute_time,cur_time; int err; while (g_stopEvent == 0 ) { if (pSocketObj->m_cur_size_ > 0 ) { absolute_time = pSocketObj->m_timer_value_; cur_time = time (NULL ); if (absolute_time < cur_time) { std::list<LPSTRUC_MSG_HEADER> m_lsIdleList; LPSTRUC_MSG_HEADER result; err = pthread_mutex_lock (&pSocketObj->m_timequeueMutex); if (err != 0 ) ngx_log_stderr (err,"CSocekt::ServerTimerQueueMonitorThread()中pthread_mutex_lock()失败,返回的错误码为%d!" ,err); while ((result = pSocketObj->GetOverTimeTimer (cur_time)) != NULL ) { m_lsIdleList.push_back (result); } err = pthread_mutex_unlock (&pSocketObj->m_timequeueMutex); if (err != 0 ) ngx_log_stderr (err,"CSocekt::ServerTimerQueueMonitorThread()pthread_mutex_unlock()失败,返回的错误码为%d!" ,err); LPSTRUC_MSG_HEADER tmpmsg; while (!m_lsIdleList.empty ()) { tmpmsg = m_lsIdleList.front (); m_lsIdleList.pop_front (); pSocketObj->procPingTimeOutChecking (tmpmsg,cur_time); } } } usleep (500 * 1000 ); } return (void *)0 ; }
CLogicSocket::procPingTimeOutChecking 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 void CLogicSocket::procPingTimeOutChecking (LPSTRUC_MSG_HEADER tmpmsg,time_t cur_time) { CMemory *p_memory = CMemory::GetInstance (); if (tmpmsg->iCurrsequence == tmpmsg->pConn->iCurrsequence) { lpngx_connection_t p_Conn = tmpmsg->pConn; if (m_ifTimeOutKick == 1 ) { zdClosesocketProc (p_Conn); } else if ( (cur_time - p_Conn->lastPingTime ) > (m_iWaitTime*3 +10 ) ) { zdClosesocketProc (p_Conn); } p_memory->FreeMemory (tmpmsg); } else { p_memory->FreeMemory (tmpmsg); } return ; }
CSocekt::DeleteFromTimerQueue zdClosesocketProc(p_Conn)会调用此函数,主要从时间队列中删除并且释放内存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void CSocekt::DeleteFromTimerQueue (lpngx_connection_t pConn) { std::multimap<time_t , LPSTRUC_MSG_HEADER>::iterator pos,posend; CMemory *p_memory = CMemory::GetInstance (); CLock lock (&m_timequeueMutex) ; lblMTQM: pos = m_timerQueuemap.begin (); posend = m_timerQueuemap.end (); for (; pos != posend; ++pos) { if (pos->second->pConn == pConn) { p_memory->FreeMemory (pos->second); m_timerQueuemap.erase (pos); --m_cur_size_; goto lblMTQM; } } if (m_cur_size_ > 0 ) { m_timer_value_ = GetEarliestTime (); } return ; }
其他模块 读取配置文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 #是注释行, #每个有效配置项用 等号 处理,等号前不超过40个字符,等号后不超过400个字符; #[开头的表示组信息,也等价于注释行 #[Socket] #ListenPort = 5678 #DBInfo = 127.0.0.1;1234;myr;123456;mxdb_g #日志相关 [Log] #日志文件输出目录和文件名 #Log=logs/error.log Log=error.log #只打印日志等级<= 数字 的日志到日志文件中 ,日志等级0-8,0级别最高,8级别最低。 LogLevel = 8 #进程相关 [Proc] #创建 这些个 worker进程 WorkerProcesses = 4 #是否按守护进程方式运行,1:按守护进程方式运行,0:不按守护进程方式运行 Daemon = 1 #处理接收到的消息的线程池中线程数量,不建议超过300 ProcMsgRecvWorkThreadCount = 120 #和网络相关 [Net] #监听的端口数量,一般都是1个,当然如果支持多于一个也是可以的 ListenPortCount = 1 #ListenPort+数字【数字从0开始】,这种ListenPort开头的项有几个,取决于ListenPortCount的数量, ListenPort0 = 80 #ListenPort1 = 443 #epoll连接的最大数【是每个worker进程允许连接的客户端数】,实际其中有一些连接要被监听socket使用,实际允许的客户端连接数会比这个数小一些 worker_connections = 2048 #Sock_RecyConnectionWaitTime:为确保系统稳定socket关闭后资源不会立即收回,而要等一定的秒数,在这个秒数之后,才进行资源/连接的回收 Sock_RecyConnectionWaitTime = 150 #Sock_WaitTimeEnable:是否开启踢人时钟,1:开启 0:不开启 Sock_WaitTimeEnable = 1 #多少秒检测一次是否 心跳超时,只有当Sock_WaitTimeEnable = 1时,本项才有用 Sock_MaxWaitTime = 20 #当时间到达Sock_MaxWaitTime指定的时间时,直接把客户端踢出去,只有当Sock_WaitTimeEnable = 1时,本项才有用 Sock_TimeOutKick = 0 #和网络安全相关 [NetSecurity] #flood检测 #Flood攻击检测是否开启,1:开启 0:不开启 Sock_FloodAttackKickEnable = 1 #Sock_FloodTimeInterval表示每次收到数据包的时间间隔是100(单位:毫秒) Sock_FloodTimeInterval = 100 #Sock_FloodKickCounter表示计算到连续10次,每次100毫秒时间间隔内发包,就算恶意入侵,把他kick出去 Sock_FloodKickCounter = 10
这种配置文件依赖于自己的想法设定,没有固定格式,主要还是看如何读取配置文件的各个参数的信息,这才是关键所在。
最终所有的信息都保存到了CConfig:: vector m_ConfigItemList; 存储配置信息的列表。之后我们想取出配置信息就从这个容器中取出即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 bool CConfig::Load (const char *pconfName) { FILE *fp; fp = fopen (pconfName,"r" ); if (fp == NULL ) return false ; char linebuf[501 ]; while (!feof (fp)) { if (fgets (linebuf,500 ,fp) == NULL ) continue ; if (linebuf[0 ] == 0 ) continue ; if (*linebuf==';' || *linebuf==' ' || *linebuf=='#' || *linebuf=='\t' || *linebuf=='\n' ) continue ; lblprocstring: if (strlen (linebuf) > 0 ) { if (linebuf[strlen (linebuf)-1 ] == 10 || linebuf[strlen (linebuf)-1 ] == 13 || linebuf[strlen (linebuf)-1 ] == 32 ) { linebuf[strlen (linebuf)-1 ] = 0 ; goto lblprocstring; } } if (linebuf[0 ] == 0 ) continue ; if (*linebuf=='[' ) continue ; char *ptmp = strchr (linebuf,'=' ); if (ptmp != NULL ) { LPCConfItem p_confitem = new CConfItem; memset (p_confitem,0 ,sizeof (CConfItem)); strncpy (p_confitem->ItemName,linebuf,(int )(ptmp-linebuf)); strcpy (p_confitem->ItemContent,ptmp+1 ); Rtrim (p_confitem->ItemName); Ltrim (p_confitem->ItemName); Rtrim (p_confitem->ItemContent); Ltrim (p_confitem->ItemContent); m_ConfigItemList.push_back (p_confitem); } } fclose (fp); return true ; }
设置文件标题 argc:命令行参数的个数 argv:是个数组,每个数组元素都是指向一个字符串的char *,里边存储的内容是所有命令行参数; 比如你输入 ./nginx -12 -v 568 -q gess
argv内存之后,接着连续的就是环境变量参数信息内存 【是咱们这个可执行程序执行时有关的所有环境变量参数信息】可以通过一个全局的environ[char **]就可以访问
environ内存和argv内存紧紧的挨着
为了修改可执行程序的命令行参数,我们必须修改argv参数而且千万不可以影响到环境变量参数信息 实现思路: (1)重新分配一块内存,用来保存environ中的内容; (2)修改argv[0]所指向的内存;
ngx_init_setproctitle环境变量拷贝到新内存 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void ngx_init_setproctitle () { gp_envmem = new char [g_envneedmem]; memset (gp_envmem,0 ,g_envneedmem); char *ptmp = gp_envmem; for (int i = 0 ; environ[i]; i++) { size_t size = strlen (environ[i])+1 ; strcpy (ptmp,environ[i]); environ[i] = ptmp; ptmp += size; } return ; }
ngx_setproctitle设置可执行程序标题 主要思路是:
获取argv[]和environ内存总长度
将标题title复制到argv起始位置
将剩下未用上的长度全部清空
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void ngx_setproctitle (const char *title) { size_t ititlelen = strlen (title); size_t esy = g_argvneedmem + g_envneedmem; if ( esy <= ititlelen) { return ; } g_os_argv[1 ] = NULL ; char *ptmp = g_os_argv[0 ]; strcpy (ptmp,title); ptmp += ititlelen; size_t cha = esy - ititlelen; memset (ptmp,0 ,cha); return ; }
日志模块 日志功能
测试 如何把发送缓冲区撑满 (1)每次服务器给客户端发送65K左右的数据,发送到第20次才出现服务器的发送缓冲区满;这时**客户端收了1个包(65K)**,【触发了epoll可写事件,此时执行了 ngx_write_request_handler()】
(2)我又发包,连续成功发送了16次,才又出现发送缓冲区满;我客户端再收包,结果连续收了16个包 ,服务器才又出现ngx_write_request_handler()函数被成功执行,这表示客户端连续收了16次包,服务器的发送缓冲区才倒出地方来;
(3)此后,大概服务器能够连续发送16次才再出现发送缓冲区满,客户端连续收16次,服务器端才出现ngx_write_request_handler()被执行【服务器的发送缓冲区有地方】;
测试结论:
(1)ngx_write_request_handler()逻辑正确;能够通过此函数把剩余的未成功发送的数据发送出去;
(2)LT模式下,我们发送数据采用的 改进方案 是非常有效的,在很大程度上提高了效率;
(3) 发送缓冲区大概10-几10K,但是我们实际测试的时候,成功的发送出去了1000多k数据才报告发送缓冲区满; 当我们发送端调用send()发送数据时,操作系统底层已经把数据发送到了 该连接的接收端 的接收缓存 ,这个接收缓存大概有几百K, 千万不要认为发送缓冲区只有几十K,所以我们send()几十k就能把发送缓冲区填满;
(4)不管怎么说,主要对方不接收数据,发送方的发送缓冲区总有满的时候;当发送缓冲满的时候,我们发送数据就会使用ngx_write_request_handler()来执行了,所以现在看起来,我们整个的服务器的发送数据的实现代码是正确的;
高并发测试 并发数量取决于很多因素:
(1)采用的开发技术:epoll,支持数十万并发
(2)这个程序收发数据的频繁程度,以及具体 要处理的业务复杂程度
(3)服务器实际的物理内存;可用的物理内存数量,会直接决定你能支持的并发连接
(4)一些其他的tcp/ip配置项
一般,我们日常所写的服务器程序,支持几千甚至1-2万的并发,基本上就差不多了;一个服务器程序,要根据我们具体的物理内存,以及我们具体要实现的业务等等因素,控制能够同时连入的客户端数量;如果你允许客户端无限连入,那么你的服务器肯定会崩溃;
这里我的解决方法是引入一个新变量m_onlineUserCount
void CSocekt::ngx_event_accept(lpngx_connection_t oldc) 连入人数+1 void CSocekt::inRecyConnectQueue(lpngx_connection_t pConn) 连入人数-1
控制连入用户数量的解决思路:如果同时连入的用户数量超过了允许的最大连入数量时,我们就把这个连入的用户直接踢出去;
安全问题思考 防范SYN Flood攻击 以游戏服务器为例:
假设我们认为一个合理的客户端一秒钟发送数据包给服务器不超过10个; 如果客户端不停的给服务器发数据包,1秒钟超过了10个数据包 ,那我服务器就认为这个玩家有恶意攻击服务器的倾向; 我们服务器就应该果断的把这个TCP客户端连接关闭,这个也是服务器发现恶意玩家以及保护自身安全的手段;
代码上如何实现 1秒钟超过10个数据包则把客户端踢出去? 增加了TestFlood();
改造了ngx_read_request_handler(),ngx_wait_request_handler_proc_p1(),在每次收到了完整包就可以调用TestFlood()
ngx_wait_request_handler_proc_plast()判断是否isflood,选择释放内存还是放入消息队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 bool CSocekt::TestFlood (lpngx_connection_t pConn) { struct timeval sCurrTime ; uint64_t iCurrTime; bool reco = false ; gettimeofday (&sCurrTime, NULL ); iCurrTime = (sCurrTime.tv_sec * 1000 + sCurrTime.tv_usec / 1000 ); if ((iCurrTime - pConn->FloodkickLastTime) < m_floodTimeInterval) { pConn->FloodAttackCount++; pConn->FloodkickLastTime = iCurrTime; } else { pConn->FloodAttackCount = 0 ; pConn->FloodkickLastTime = iCurrTime; } if (pConn->FloodAttackCount >= m_floodKickCount) { reco = true ; } return reco; }
收到太多数据包处理不过来 限速:epoll技术,一个限速的思路;在epoll红黑树节点中,把这个EPOLLIN【可读】通知干掉;系统不会通知,服务器就不会去读,数据一直积累在接收缓冲区里,客户端那边会的发送缓冲区会满,客户端会减慢速度发送甚至停止发送。
数据报太多的话,会在printTDInfo()中做了一个简单提示
积压太多数据包发送不出去 见void CSocekt::msgSend(char *psendbuf) ,增加一个判断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 void CSocekt::msgSend (char *psendbuf) { CMemory *p_memory = CMemory::GetInstance(); CLock lock (&m_sendMessageQueueMutex) ; if (m_iSendMsgQueueCount > 50000 ) { m_iDiscardSendPkgCount++; p_memory->FreeMemory(psendbuf); return ; } LPSTRUC_MSG_HEADER pMsgHeader = (LPSTRUC_MSG_HEADER)psendbuf; lpngx_connection_t p_Conn = pMsgHeader->pConn; if (p_Conn->iSendCount > 400 ) { ngx_log_stderr(0 ,"CSocekt::msgSend()中发现某用户%d积压了大量待发送数据包,切断与他的连接!" ,p_Conn->fd); m_iDiscardSendPkgCount++; p_memory->FreeMemory(psendbuf); zdClosesocketProc(p_Conn); return ; } ++p_Conn->iSendCount; m_MsgSendQueue.push_back(psendbuf); ++m_iSendMsgQueueCount; if (sem_post(&m_semEventSendQueue)==-1 ) { ngx_log_stderr(0 ,"CSocekt::msgSend()中sem_post(&m_semEventSendQueue)失败." ); } return ; }
连入安全的进一步完善 void CSocekt::ngx_event_accept(lpngx_connection_t oldc)
1 2 3 4 5 6 7 8 9 10 11 12 if (m_connectionList.size() > (m_worker_connections * 5 )){ if (m_freeconnectionList.size() < m_worker_connections) { close(s); return ; } }
压力测试 一般要测试很多天,跑的时间长了可能 会暴露下次,跑的时间短了可能还暴露不出来;
1 2 3 4 5 6 #define _CONNECTION_COUNT_ 2048 #define _THREAD_COUNT_ 100 #define CESHIXIBIAO _CONNECTION_COUNT_ + 2000 #define SERVERIPADDR "192.168.200.129" #define DEFAULT_PORT 80 #define _RECVTIMEOUT_ 1500
初始化socket后创建100个线程,线程具体执行如下所示:
1 2 3 4 5 6 7 8 9 10 11 ScanThread socket () connect () FuncsendrecvData () send () recv () FunccloseSocket () closesocket () ; FunccreateSocket() socket() connect();
建议:
(1)测试收包,简单的逻辑处理,发包;
(2)建议如果有多个物理电脑;客户端单独放在一个电脑;
建议用高性能linux服务器专门运行服务器程序 windows也建议单独用一个电脑来测试;
(3)测试什么?
程序崩溃,这明显不行,肯定要解决
程序运行异常,比如过几个小时,服务器连接不上了;没有回应了,你发过来的包服务器处理不了了;
服务器程序占用的内存才能不断增加,增加到一定程度,可能导致整个服务器崩溃;
top -p 子进程ID:显示进程占用的内存和cpu百分比,用q可以退出; top -p pid,推荐文章:https://www.cnblogs.com/dragonsuc/p/5512797.html
cat /proc/子进程ID/status ———其中VmRSS: 7700 kB,占用的实际内存。
最大连接是1018
日志中报:CSocekt::ngx_event_accept()中accept4()失败 这个跟 用户进程可打开的文件数限制有关; 因为系统为每个tcp连接都要创建一个socekt句柄,每个socket句柄同时也是一个文件句柄;
1 2 fengyun@ubuntu:~/share/nginx$ ulimit -n 1024
通过ulimit -n
可以看到进程允许打开的文件数目限制是1024。而减去标准输入输出,错误输出,日志文件,监听端口等这几个占用的fd后数目是1018。
我们就必须修改linux对当前用户的进程 同时打开的文件数量的限制;
惊群 惊群:1个master进程 4个worker进程
一个连接进入,惊动了4个worker进程,但是只有一个worker进程accept();其他三个worker进程被惊动,这就叫惊群;
但是,这三个被惊动的worker进程都做了无用功【操作系统本身的缺陷】;
配置nginx的worker子进程数目为4,然后借助telnet进行连接测试
1 fengyun@ubuntu:~/share/nginx$ telnet 192.168 .200 .129 80
在ngx_epoll_process_events()加入一个测试代码ngx_log_stderr(0,"惊群测试:events=%d,进程id=%d",events,ngx_pid);
可以观察到尽管只有一个telnet三次握手事件连入,但是四个worker进程都被唤醒了。
如何解决惊群问题?深入浅出 Linux 惊群:现象、原因和解决方案
epoll底层LT逻辑
1 2 [1] 遍历并清空epoll的ready list,遍历过程中,对于每个epi收集其返回的events,如果没收集到event,则continue去处理其他epi,否则将当前epi的事件和用户传入的数据都copy给用户空间,并判断,如果是在LT模式下,则将当前epi重新放回epoll的ready list [2] 遍历epoll的ready list完成后,如果ready list不为空,则继续唤醒epoll睡眠队列wq上的其他task B。task B从epoll_wait醒来继续前行,重复上面的流程,继续唤醒wq上的其他task C,这样链式唤醒下去。
例如有两个进程 A、B 睡眠在 epoll 的睡眠队列,fd 的可读事件到来唤醒进程 A,但是 A 可能很久才会去处理 fd 的事件,或者它根本就不去处理。 根据 LT 的语义,当前fd的事件未处理,因此应该要唤醒进程 B 的。
LT模式下惊群的原因
1 2 3 4 5 6 [1] epoll在ET模式下不存在“惊群”现象,LT模式是epoll“惊群”的根源,并且LT模式下的“惊群”没办法避免。 [2] LT的“惊群”是链式唤醒的,唤醒过程直到当前epi的事件被处理了,无法获得到新的事件才会终止唤醒过程。 例如有A、B、C、D...等多个进程task睡眠在epoll的睡眠队列上,并且都监控同一个listen fd的可读事件。一个请求上来,会首先唤醒A进程,A在epoll_wait的处理过程中会唤醒进程B,这样进程B在epoll_wait的处理过程中会唤醒C,这个时候A的epoll_wait处理完成返回,进程A调用accept读取了当前这个请求,进程C在自己的epoll_wait处理过程中,从epi中获取不到事件了,于是终止了整个链式唤醒过程。 [3] 多个进程的epoll fd由于指向同一个epoll内核对象,他们对epoll fd的相关epoll_ctl操作会相互影响。一不小心可能会出现一些比较诡异的行为。 想象这样一个场景(实际上应该不是这样用),有一个服务在1234,1235,1236这3个端口上提供服务,于是它epoll_create得到epoll fd后,fork出3个工作的子进程A、B、C,它们分别在这3个端口创建listen fd,然后加入到epoll中监听其可读事件。这个时候端口1234上来一个请求,A、B、C同时被唤醒,A在epoll_wait返回后,在进行accept前由于种种原因卡住了,没能及时accept。B、C在epoll_wait返回后去accept又不能accept到请求,这样B、C重新回到epoll_wait,这个时候又被唤醒,这样只要A没有去处理这个请求之前,B、C就一直被唤醒,然而B、C又无法处理该请求。 [4] ET模式下,一个fd上的同事多个事件上来,只会唤醒一个睡眠在epoll上的task,如果该task没有处理完这些事件,在没有新的事件上来前,epoll不会在通知task去处理。
官方nginx解决惊群的办法:锁,进程之间的锁;谁获得这个锁,谁就往监听端口增加EPOLLIN标记,有了这个标记,客户端连入就能够被服务器感知到;
Nginx 通过一次仅允许一个进程将 listen fd 放入自己的 epoll 来监听其 READ 事件的方式来达到 listen fd”惊群”避免。然而做好这一点并不容易,作为一个高性能 web 服务器,需要尽量避免阻塞,并且要很好平衡各个工作 worker 的请求,避免饿死情况。
Nginx 采用在同一时刻仅允许一个 worker 进程监听 listen fd 的可读事件的方式,来避免 listen fd 的”惊群”现象。然而这种方式编程实现起来比较难,难道不能像 accept 一样解决 epoll 的”惊群”问题么?答案是可以的
首先我采用的是先 fork 后 epoll_create (LT模式)
用法上,通常是在父进程创建了 listen fd 后,fork 多个 worker 子进程来共同处理同一个 listen fd 上的请求。这个时候,A、B、C…等多个子进程分别创建自己独立的 epoll fd,然后将同一个 listen fd 加入到 epoll 中,监听其可读事件。这种情况下,epoll 有以下这些特性:
1 2 [1] 由于相对同一个listen fd而言, 多个进程之间的epoll是平等的,于是,listen fd上的一个请求上来,会唤醒所有睡眠在listen fd睡眠队列上的epoll,epoll又唤醒对应的进程task,从而唤醒所有的进程(这里不管listen fd是以LT还是ET模式加入到epoll)。 [2] 多个进程间的epoll是独立的,对epoll fd的相关epoll_ctl操作相互独立不影响。
可以看出,在使用友好度方面,多进程独立 epoll 实例要比共用 epoll 实例的模式要好很多。独立 epoll 模式要解决 fd 的排他唤醒 epoll 即可。
3.9以上内核版本的linux,在内核中解决了惊群问题;而且性能比官方nginx解决办法效率高很多; reuseport【复用端口】,是一种套接字的复用机制,允许将多个套接字bind到同一个ip地址/端口上,这样一来,就可以建立多个服务器来接收到同一个端口的连接【多个worker进程能够监听同一个端口】;
于是,基本的解决方案是起多个 listen socket,好在我们有 SO_REUSEPORT(linux 3.9 以上内核支持),它支持多个进程或线程 bind 相同的 ip 和端口,支持以下特性:
1 2 3 [1] 允许多个socket bind/listen在相同的IP,相同的TCP/UDP端口 [2] 目的是同一个IP、PORT的请求在多个listen socket间负载均衡 [3] 安全上,监听相同IP、PORT的socket只能位于同一个用户下
但是注意目前master进程中在ngx_open_listening_sockets创建了一个监听套接字,创建了四个worker进程的监听套接字和master套接字是同一个,即使设置了reuseport仍然会产生惊群现象、
于是,在一个多核 CPU 的服务器上,我们通过 SO_REUSEPORT 来创建多个监听相同 IP、PORT 的 listen socket,每个进程监听不同的 listen socket 。这样,在只有 1 个新请求到达监听的端口的时候,内核只会唤醒一个进程去 accept,而在同时并发多个请求来到的时候,内核会唤醒多个进程去 accept,并且在一定程度上保证唤醒的均衡性。
看了这位腾讯 IEG 后台开发工程师的文章,我选择了试着在worker进程中使用ngx_open_listening_sockets,每个worker进程都会创建一个监听套接字listenfd,然后使用reuseport。 这样就不再造成惊群了。
性能优化 从两个方面看下性能优化问题;
软件层面:
充分利用cpu,比如惊群问题 ;
深入了解tcp/ip协议,通过一些协议参数配置来进一步改善性能;
处理业务逻辑方面,算法方面有些内容,可以提前做好;
硬件层面【花钱搞定】:
高速网卡,增加网络带宽;
专业服务器;数十个核心,马力极其强;
内存:容量大,访问速度快;
主板,总线不断升级的;
性能优化的实施 绑定cpu、提升进程优先级
一个worker进程运行在一个核上;为什么能够提高性能呢?
cpu:缓存;cpu缓存命中率问题;把进程固定到cpu核上,可以大大增加cpu缓存命中率,从而提高程序运行效率; nginx官方有一个函数worker_cpu_affinity【cpu亲和性】,就是为了把worker进程固定的绑到某个cpu核上; ngx_set_cpu_affinity,ngx_setaffinity;
提升进程优先级,这样这个进程就有机会被分配到更多的cpu时间(时间片【上下文切换】),得到执行的机会就会增多;
setpriority();
干活时进程 处于R状态,没有连接连入时,进程处于S
pidstat - w - p 3660 1 看某个进程的上下文切换次数[切换频率越低越好]
cswch/s:主动切换/秒:你还有运行时间,但是因为你等东西,你把自己挂起来了,让出了自己时间片。
nvcswch/s:被动切换/秒:时间片耗尽了,你必须要切出去;
一个服务器程序,一般只放在一个计算机上跑,专用机;
TCP / IP协议的配置选项 这些配置选项都有缺省值,通过修改,在某些场合下,对性能可能会有所提升;
若要修改这些配置项,要求做到以下几点:
对这个配置项有明确的理解;
对相关的配置项,记录他的缺省值,做出修改;
要反复不断的亲自测试,亲自验证;是否提升性能,是否有副作用;
TCP / IP协议的配置选项
绑定cpu、提升进程优先级
TCP / IP协议的配置选项
TCP/IP协议额外注意的一些算法、概念等
配置最大允许打开的文件句柄数 cat /proc/sys/fs/file-max :查看操作系统可以使用的最大句柄数 cat /proc/sys/fs/file-nr :查看当前已经分配的,分配了没使用的,文件句柄最大数目
1 2 3 4 fengyun@ubuntu:~/share/nginx$ sudo cat /proc/sys/fs/file-max 9223372036854775807 fengyun@ubuntu:~/share/nginx$ sudo cat /proc/sys/fs/file-nr 7872 0 9223372036854775807
限制用户使用的最大句柄数 /etc/security/limit.conf文件; root soft nofile 60000 :setrlimit(RLIMIT_NOFILE) root hard nofile 60000
ulimit -n :查看系统允许的当前用户进程打开的文件数限制 ulimit -HSn 5000 :临时设置,只对当前session有效; n:表示我们设置的是文件描述符 推荐文章:https://blog.csdn.net/xyang81/article/details/52779229
内存池补充说明 为什么没有用内存池技术:感觉必要性不大,等待有时间再补充一下吧。 TCMalloc,取代malloc(); 库地址:https://github.com/gperftools/gperftools