单进程/线程 TCP 通信过程中,服务器端启动之后可以同时和多个客户端建立连接,并进行网络通信。 先看一下之前的服务器代码的处理思路,再来分析代码中的不足:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> int main () { int lfd = socket (AF_INET, SOCK_STREAM, 0 ); struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons (10000 ); addr.sin_addr.s_addr = INADDR_ANY; int ret = bind (lfd, (struct sockaddr*) &addr, sizeof (addr)); ret = listen (lfd, 128 ); struct sockaddr_in caddr; int clen = sizeof (caddr); int cfd = accept (lfd, (struct sockaddr*) &caddr, &clen); while (1 ) { char buf[1024 ]; memset (buf, 0 , sizeof (buf)); int len = read (cfd, buf, sizeof (buf)); if (len > 0 ) { printf ("read client data: %s\n" , buf); write (cfd, buf, len); } else if (len == 0 ) { printf ("client closed...\n" ); break ; } else { perror ("read" ); break ; } } close (cfd); close (lfd); return 0 ; }
在上面的代码中用到了三个会引起程序阻塞 的函数,分别是:accept()
:如果服务器端没有新的客户端连接,阻塞当前进程/线程;如果检测到新的连接,解除阻塞、建立连接。read()
:如果通信的套接字对应的读缓冲区没有数据,阻塞当前进程/线程;如果检测到数据,解除阻塞、接收数据。write()
:如果通信的套接字写缓冲区被写满,阻塞当前进程/线程(这种情况比较少见)
如果需要和发起新的连接请求的客户端建立连接,那么就必须在服务器端通过一个循环调用 accept() 函数, 另外已经和服务器建立连接的客户端需要和服务器通信,发送数据时的阻塞可以忽略,当接收不到数据时程序也会被阻塞, 被 accept()
阻塞就无法通信,被 read()
阻塞就无法和客户端建立新连接。 基于上述处理方式,在单进程/线程场景下,服务器是无法处理多连接的。 解决方案也有很多,常用的有三种:
1 2 3 4 使用多进程实现 使用多线程实现 使用 IO 多路转接(复用)实现 使用 IO 多路转接 + 多线程实现
多进程并发 如果要编写多进程的并发服务器程序,首先要考虑,创建出的多个进程都是什么角色,在 Tcp 服务器端一共有两个角色,分别是:监听和通信。 监听是一个持续的动作,如果有新连接就建立连接;如果没有新连接就阻塞。 通信是需要和多个客户端同时进行的,因此需要多个进程,这样才能达到互不影响的效果。 进程也有两大类:父进程和子进程,通过分析可以这样分配进程。
父进程: 负责监听:处理客户端的连接请求,也就是在父进程中循环调用 accept()
函数 创建子进程:建立一个新的连接,就创建一个新的子进程,让这个子进程和对应的客户端通信 回收子进程资源:子进程退出时,回收其内核 PCB 资源,防止出现僵尸进程 子进程: 负责通信:基于父进程建立新连接之后得到的文件描述符,和对应的客户端完成数据的接收和发送。 发送数据:send()/write() 接收数据:recv()/read()
在多进程版的服务器端程序中,多个进程是有血缘关系。 对于有血缘关系的进程来说,还需要想清楚他们有哪些资源是可以被继承的,哪些资源是独占的,以及其他细节: (1)子进程是父进程的拷贝,在子进程的内核区 PCB 中,文件描述符也是可以被拷贝的,因此在父进程可以使用的文件描述符在子进程中也有一份,并且可以使用它们做和父进程一样的事情。 (2)父子进程有各自的独立的虚拟地址空间,因此所有的资源都是独占的。 (3)为了节省系统资源,对于只有在父进程才能用到的资源,可以在子进程中将其释放掉,父进程亦如此。 (4)由于需要在父进程中做 accept() 操作,并且要释放子进程资源,如果想要高效可以使用信号的方式处理。
多进程版并发 TCP 服务器,示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <signal.h> #include <sys/wait.h> #include <errno.h> void callback (int num) { while (1 ) { pid_t pid = waitpid (-1 , NULL , WNOHANG); if (pid <= 0 ) { printf ("child process is running, or is waited\n" ); break ; } printf ("child process, pid = %d\n" , pid); } }int childWork (int cfd) ;int main () { int lfd = socket (AF_INET, SOCK_STREAM, 0 ); if (lfd == -1 ) { perror ("socket" ); exit (0 ); } struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons (10000 ); addr.sin_addr.s_addr = INADDR_ANY; int ret = bind (lfd, (struct sockaddr*) &addr, sizeof (addr)); if (ret == -1 ) { perror ("bind" ); exit (0 ); } ret = listen (lfd, 128 ); if (ret == -1 ) { perror ("listen" ); exit (0 ); } struct sigaction act; act.sa_flags = 0 ; act.sa_handler = callback; sigemptyset (&act.sa_mask); sigaction (SIGCHLD, &act, NULL ); while (1 ) { struct sockaddr_in cliaddr; int clilen = sizeof (cliaddr); int cfd = accept (lfd, (struct sockaddr*)&cliaddr, &clilen); if (cfd == -1 ) { if (errno == EINTR) { continue ; } perror ("accept" ); exit (0 ); } char ip[24 ] = {0 }; printf ("client ip: %s, port: %d\n" , inet_ntop (AF_INET, &cliaddr.sin_addr.s_addr, ip, sizeof (ip)), ntohs (cliaddr.sin_port)); pid_t pid = fork(); if (pid == 0 ) { close (lfd); while (1 ) { int ret = childWork (cfd); if (ret <=0 ) { break ; } } close (cfd); exit (0 ); } else if (pid > 0 ) { close (cfd); } } return 0 ; }int childWork (int cfd) { char buf[1024 ]; memset (buf, 0 , sizeof (buf)); int len = read (cfd, buf, sizeof (buf)); if (len > 0 ) { printf ("read client data: %s\n" , buf); write (cfd, buf, len); } else if (len == 0 ) { printf ("client closed...\n" ); } else { perror ("read" ); } return len; }
上面的示例代码中,父子进程中分别关掉了用不到的文件描述符(父进程不需要通信,子进程也不需要监听)。 如果客户端主动断开连接,那么服务器端负责和客户端通信的子进程也就退出,子进程退出之后会给父进程发送一个叫做 SIGCHLD 的信号, 在父进程中通过 sigaction()
函数捕捉了该信号,通过回调函数 callback()
中的 waitpid()
对退出的子进程进行了资源回收。
多线程并发 编写多线程版的并发服务器程序和多进程思路差不多。 多线程中的线程有两大类:主线程(父线程)和子线程,它们分别要在服务器端处理监听和通信流程。 根据多进程的处理思路,就可以这样设计:
主线程: 负责监听:处理客户端的连接请求,也就是在父进程中循环调用 accept()
函数 创建子线程:建立一个新的连接,就创建一个新的子进程,让这个子进程和对应的客户端通信 回收子线程资源:由于回收需要调用阻塞函数,这样就会影响 accept()
,直接做线程分离。 子线程: 负责通信:基于主线程建立新连接之后得到的文件描述符,和对应的客户端完成数据的接收和发送。 发送数据:send()/write() 接收数据:recv()/read()
在多线程版的服务器端程序中,多个线程共用同一个地址空间,有些数据是共享的,有些数据的独占的,下面来分析一些细节: (1)同一地址空间中的多个线程的栈空间是独占的 (2)多个线程共享全局数据区、堆区、内核区的文件描述符等资源,需要注意数据覆盖问题,并且多个线程访问共享资源的时候,需要进行线程同步。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <pthread.h> struct SockInfo { int fd; pthread_t tid; struct sockaddr_in addr; };struct SockInfo infos[128 ];void * working (void * arg) { struct SockInfo * pinfo = (struct SockInfo*)arg; char ip[32 ]; inet_ntop (AF_INET, &pinfo->addr.sin_addr.s_addr, ip, sizeof (ip)); unsigned short port = ntohs (pinfo->addr.sin_port); printf ("client ip: %s, port: %d\n" , ip, port); while (1 ) { char buf[1024 ]; memset (buf, 0 , sizeof (buf)); int len = read (pinfo->fd, buf, sizeof (buf)); if (len > 0 ) { printf ("read client data : %s\n" , buf); write (pinfo->fd, buf, strlen (buf)); } else if (len == 0 ) { printf ("client closed...\n" ); pinfo->fd = -1 ; break ; } else { perror ("read" ); pinfo->fd = -1 ; exit (-1 ); } } return NULL ; }int main () { int fd = socket (AF_INET, SOCK_STREAM, 0 ); if (fd == -1 ) { perror ("socket" ); exit (-1 ); } struct sockaddr_in saddr; saddr.sin_family = AF_INET; saddr.sin_port = htons (9999 ); saddr.sin_addr.s_addr = INADDR_ANY; int ret = bind (fd, (struct sockaddr*) &saddr, sizeof (saddr)); if (ret == -1 ) { perror ("bind" ); exit (-1 ); } ret = listen (fd, 128 ); if (ret == -1 ) { perror ("listen" ); exit (-1 ); } int max = sizeof (infos) / sizeof (infos[0 ]); for (int i = 0 ; i < max; ++i) { bzero (&infos[i], sizeof (infos[i])); infos[i].fd = -1 ; infos[i].tid = -1 ; } int addrlen = sizeof (struct sockaddr_in); while (1 ) { struct SockInfo * pinfo; for (int i = 0 ; i < max; ++i) { if (infos[i].fd == -1 ) { pinfo = &infos[i]; break ; } if (i == max - 1 ) { sleep (1 ); i--; } } int cfd = accept (fd, (struct sockaddr*) &pinfo->addr, &addrlen); printf ("parent thread, cfd: %d\n" , cfd); if (cfd == -1 ) { perror ("accept" ); break ; } pinfo->fd = cfd; pthread_create (&pinfo->tid, NULL , working, pinfo); pthread_detach (pinfo->tid); } close (fd); return 0 ; }
基于线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <pthread.h> #include "threadpool.h" struct SockInfo { int fd; pthread_t tid; struct sockaddr_in addr; };typedef struct PoolInfo { int fd; ThreadPool* pool; } PoolInfo;void working (void * arg) ;void acceptConn (void * arg) ;int main () { int fd = socket (AF_INET, SOCK_STREAM, 0 ); if (fd == -1 ) { perror ("socket" ); exit (-1 ); } struct sockaddr_in saddr; saddr.sin_family = AF_INET; saddr.sin_port = htons (9999 ); saddr.sin_addr.s_addr = INADDR_ANY; int ret = bind (fd, (struct sockaddr*) &saddr, sizeof (saddr)); if (ret == -1 ) { perror ("bind" ); exit (-1 ); } ret = listen (fd, 128 ); if (ret == -1 ) { perror ("listen" ); exit (-1 ); } ThreadPool* pool = threadPoolCreate (3 , 8 , 100 ); PoolInfo* info = (PoolInfo*)malloc (sizeof (PoolInfo)); info->pool = pool; info->fd = fd; threadPoolAdd (pool, acceptConn, info); pthread_exit (NULL ); return 0 ; }void acceptConn (void * arg) { PoolInfo* poolInfo = (PoolInfo*)arg; int addrlen = sizeof (struct sockaddr_in); while (1 ) { struct SockInfo * pinfo; pinfo = (struct SockInfo*)malloc (sizeof (struct SockInfo)); pinfo->fd = accept (poolInfo->fd, (struct sockaddr*) &pinfo->addr, &addrlen); if (pinfo->fd == -1 ) { perror ("accept" ); break ; } threadPoolAdd (poolInfo->pool, working, pinfo); } close (poolInfo->fd); }void working (void * arg) { struct SockInfo * pinfo = (struct SockInfo*)arg; char ip[32 ]; inet_ntop (AF_INET, &pinfo->addr.sin_addr.s_addr, ip, sizeof (ip)); unsigned short port = ntohs (pinfo->addr.sin_port); printf ("client ip: %s, port: %d\n" , ip, port); while (1 ) { char buf[1024 ]; memset (buf, 0 , sizeof (buf)); int len = read (pinfo->fd, buf, sizeof (buf)); if (len > 0 ) { printf ("read client data : %s\n" , buf); write (pinfo->fd, buf, strlen (buf)); } else if (len == 0 ) { printf ("client closed...\n" ); pinfo->fd = -1 ; break ; } else { perror ("read" ); pinfo->fd = -1 ; exit (-1 ); } } close (pinfo->fd); }
参考资料 https://subingwen.cn/linux/concurrence