Article source: blog.csdn.net/xilango/art…

This article is for beginners to learn socket epoll and multithreading partners

A brief overview of library functions:

The socket() function —— is used to allocate the description of a socket and its resources (to create a socket) according to the specified address family, data type, and protocol. (AF_INET is used in this article). Second argument: Specify the socket type (SOCK_STREAM is used in this article). The third parameter is to specify the protocol (TCP is used in this article). The return value is a socket. 2. Setsockopt () function —— Function: Used to set the option value of any type or state socket (port reuse). First argument: a descriptor that identifies a socket. The second argument: the level of the option definition; Supports SOL_SOCKET, IPPROTO_TCP, IPPROTO_IP, and IPPROTO_IPV6. Third parameter: the option to set. The fourth parameter: pointer to the buffer that holds the new value of the option to be set. Fifth parameter: optval Buffer length. 3. The —— function binds a local address to a set of interfaces by assigning a local name to an unnamed socket (host address/port number). Pointer to type sockaddr_in (binding the address and port to the socket) 4. The listen() function —– creates a socket and listens for the requested connection. The first parameter: the socket(return value) that identifies a bound unconnected socket. Second parameter: the maximum length of the waiting connection queue; The accept() function —— accepts a connection on a socket. The first argument, the socket descriptor, listens for connections after listen(). Second argument: pointer to a buffer that receives the address of the connected entity known to the communication layer. The actual format of the second parameter is determined by the address family generated when the socket is created. The third argument: pointer, the input argument, used in conjunction with addr, points to an integer containing the length of the addr address. Return value: New socket. 6. The FCNTL () function —— can change the nature of opened files. For example, make accept(), recv(),send() become non-blocking. The first parameter: represents the file descriptor to be set. The second argument: the instruction representing the intended operation: F_DUPFD is used to find the smallest and still unused file descriptor greater than or equal to the parameter arg and to copy the file descriptor for the parameter fd. The newly copied file descriptor is returned on success. The new descriptor shares the same file entry as fd, but the new descriptor has its own set of file descriptor flags, where the FD_CLOEXEC file descriptor flag is cleared. See dup2(). F_GETFD retrieves the close-on-exec flag. If the flag’s FD_CLOEXEC bit is 0, the file will not be closed when the exec() related function is called. F_SETFD sets the close-on-exec flag. The flag is determined by the FD_CLOEXEC bit of the arG parameter. F_GETFL gets the file descriptor status flag, which is flags for the open () argument. F_SETFL sets the file descriptor status flag. Arg is the new flag, but only the O_APPEND, O_NONBLOCK and O_ASYNC bits are allowed to change. Other bits are not affected. F_GETLK gets the file lock status. F_SETLK Sets the file lock status. In this case, the L_TYPE value of flCOK structure must be F_RDLCK, F_WRLCK or F_UNLCK. If the lock cannot be established, -1 is returned with the error code EACCES or EAGAIN. F_SETLKW F_SETLK has the same effect but cannot establish a lock. This call will wait until the lock action succeeds. If the signal is interrupted while waiting for the lock, -1 is returned immediately with the error code EINTR. The third argument: the lock argument pointer is a flock structure pointer. _type has three states: F_RDLCK Establishes a lock for reading F_WRLCK establishes a lock for writing F_UNLCK Establishes a lock before deleting l_whence There are also three ways: SEEK_SET The start of a file is the starting position for the lock. SEEK_CUR The current read/write position of the file is the start position of the lock SEEK_END the end of the file is the start position of the lock. L_start represents the offset relative to l_whence, which together determine the start position of the locked area. L_len indicates the length of the locked region, and 0 indicates that it starts from the starting position (determined by l_whence and l_start) up to the maximum possible offset. That is, no matter how much data is added later, it is within the scope of the lock. Success returns a CMD dependent value, or -1 if there is an error. The cause of the error is stored in errno. Example: : flags = FCNTL (sockfd, F_GETFL, 0); // Get the flags value for the file. fcntl(sockfd, F_SETFL, flags | O_NONBLOCK); // Set to non-blocking mode; flags = fcntl(sockfd,F_GETFL,0); fcntl(sockfd,F_SETFL,flags&~O_NONBLOCK); // Set blocking mode; The epoll_create() function —– creates an epoll handle (file descriptor) with a value greater than 0. The return value is the file descriptor. The epoll_ctl() function ——– changes the type of the monitored event. Unlike select(), which tells the kernel what type of event to listen for, it registers the type of event to listen for. The first argument is the return value of epoll_create(), and the second argument represents the action, represented by three macros: EPOLL_CTL_ADD: 1 Registers a new FD into an EPFD; EPOLL_CTL_MOD: 2 Modifies the listening event of a registered FD. EPOLL_CTL_DEL: 3 Deletes a fd from an EPfd; The third parameter is the socket to monitor; The fourth argument tells the kernel what to listen for. Return 0 on success, -1 on registration failure. The first argument: the file descriptor (epoll_create () return value); The second parameter is the set of events to retrieve from the kernel, the third parameter is the maximum number of events that can be processed at a time, telling the kernel how big this wait_event is, and the fourth parameter is the timeout (milliseconds, 0 will be returned immediately, -1 will be undefined, or permanently blocked). This function returns the number of events that need to be processed, with a return of 0 indicating timeout. 10.pthread_mutex_lock(); —— Effect: The mutex is locked. A thread calls this function to lock the mutex. If the mutex is already locked and owned by another thread, the calling thread blocks until the mutex becomes available (locks the thread). Parameters: Returned value: zero is returned upon successful completion 11.pthread_mutex_unlock()——- Action: pairs with pthread_mutex_lock. Pthread_create ()——— Action: The first argument to create a thread is a pointer to the thread identifier. (Thread ID) The second argument: sets the thread properties. The third argument is the starting address from which the thread runs the function. The fourth argument: is the argument to run the function. Returns zero on success

[cpp]  view plain  copy

  1. #include <iostream>  
  2. #include <pthread.h>  
  3. #include <string.h>  
  4. #include <stdlib.h>  
  5. #include <unistd.h>  
  6. #include <stdio.h>  
  7. #include <fcntl.h>  
  8. #include <arpa/inet.h>  
  9. #include <sys/epoll.h>  
  10. #include <sys/errno.h>  
  11. #include <sys/socket.h>  
  12. #define NUMBER 10  
  13. #define LISTENMAX 20  
  14. # define IP 127.0.0.1 “”
  15. #define PORT 80000  
  16. #define LINET 10000  
  17. using namespace std;  
  18. static unsigned int threadParameter[NUMBER][8]; // Thread parameters
  19. pthread_t threadId[NUMBER]; / / thread id
  20. pthread_mutex_t threadLock[NUMBER]; / / thread lock
  21. pthread_cond_t count_nonzero[NUMBER];  
  22. int count1[NUMBER]={0};  
  23. static struct  dataPacket  
  24.   {  
  25.        struct epoll_event ev;  
  26.        struct epoll_event waitEvent[LINET];  
  27.        int sockNumber[LINET]={0};  
  28.        int MAX=0;  
  29.        int epfd=0;  
  30.   }ThreaDataPackage;  
  31.   void decrement_count (int i)  
  32.  {  
  33.   
  34.     pthread_mutex_lock (threadLock+i);  
  35.     while(count1[i]==0)  
  36.                 pthread_cond_wait( count_nonzero+i, threadLock+i);  
  37.     count1[i]=0;  
  38.     pthread_mutex_unlock (threadLock+i);  
  39. }  
  40. void increment_count(int i)  
  41. {  
  42.     pthread_mutex_lock(threadLock+i);  
  43.     pthread_cond_signal(count_nonzero+i);  
  44.     count1[i]=1;  
  45.     pthread_mutex_unlock(threadLock+i);  
  46. }  
  47. Void * serverSocket(unsigned int *parameter)// Thread main function
  48. {   char buf[1024];  
  49.     char buff[1024];  
  50.     pthread_detach(pthread_self());  
  51.     while(1)  
  52.     {  
  53.     decrement_count (parameter[7]);  
  54. Printf (” Start thread: %d\n”,parameter[7]);
  55.     memset(buf,0,sizeof(buf));  
  56.     memset(buff,0,sizeof(buff));  
  57. int len=recv(parameter[1], buf, 1024, MSG_NOSIGNAL); // Message reception in non-blocking mode
  58.             if(len>0)  
  59.             {  
  60.                 printf(“%s\n”,buf);  
  61.             }  
  62.             if(len==0)  
  63.             {  
  64. for(int i=0; i
  65.                 {  
  66.                     if(parameter[1]==ThreaDataPackage.sockNumber[i])  
  67.                     {   ThreaDataPackage.MAX–;  
  68.                         ThreaDataPackage.sockNumber[i]=0;  
  69.                         close(ThreaDataPackage.sockNumber[i]);  
  70. Printf (” client %d offline \n”, threadatapackage.max);
  71. If (epoll_ctl(threadatapackage.epfd, EPOLL_CTL_DEL,parameter[1], & Threadatapackage.ev) < 0)// adds the epoll event to the collection
  72.                             {  
  73.                                 perror(“epoll_ctl error:”);  
  74.   
  75.                             }  
  76.                         break;  
  77.                     }  
  78.                 }  
  79.             }  
  80. Sprintf (buff,” hello client I am the % D you sent is: “,parameter[7]);
  81.             strcat(buff,buf);  
  82. len=send(parameter[1],buff,1024,MSG_NOSIGNAL); // Messages are sent in non-blocking mode
  83.             memset(buff,0,sizeof(buff));  
  84. parameter[0]= 0; // Set thread occupancy flag to “free”
  85.             }  
  86. }  
  87. Static int initThreadPool(void)// Initializes data
  88. {   int a=0;  
  89. for(int i=0; i
  90.     {  
  91.         threadParameter[i][0]=0;  
  92.         threadParameter[i][7]=i;  
  93.         pthread_cond_init(count_nonzero+i,NULL);  
  94.         pthread_mutex_init(threadLock+i,NULL);  
  95.         a= pthread_create( threadId+ i, NULL, (void* (*)(void *))serverSocket,(void *)(threadParameter[i]));  
  96. if(a! = 0)
  97.         {  
  98.             perror(“pthread_create error:”);  
  99.             return -1;  
  100.         }  
  101.     }  
  102.     return 0;  
  103. }  
  104. Static int initListen(char* IP,int port,int listenMax
  105. {   int a=0;  
  106.     int sockfd=socket(AF_INET,SOCK_STREAM,0);  
  107.  if(sockfd<0)  
  108.     {  
  109.         perror(“sockt error:”);  
  110.         close(sockfd);  
  111.         return -1;  
  112.     }  
  113.     struct sockaddr_in server_addr;  
  114.     bzero(&server_addr, sizeof(server_addr));  
  115.     server_addr.sin_family=AF_INET;  
  116.     inet_pton(AF_INET,ip,&(server_addr.sin_addr));  
  117.     server_addr.sin_port=htons(port);  
  118.     int opt = 1;  
  119.     setsockopt(sockfd, SOL_SOCKET,SO_REUSEADDR, (const void *) &opt, sizeof(opt));  
  120.     a=bind(sockfd,(struct sockaddr*)&server_addr,sizeof(server_addr));  
  121.     if(a<0)  
  122.     {  
  123.         perror(“bind error:”);  
  124.         close(sockfd);  
  125.         return -1;  
  126.     }  
  127.     a=listen(sockfd,listenMax);  
  128.     if(a<0)  
  129.     {  
  130.         perror(“listen error:”);  
  131.         close(sockfd);  
  132.         return -1;  
  133.     }  
  134.     return sockfd;  
  135. }  
  136. Bool setNonBlock(int fd)// Sets the file descriptor to NonBlock
  137. {  
  138.    int flags = fcntl(fd, F_GETFL, 0);  
  139.    flags |= O_NONBLOCK;  
  140.    if(-1 == fcntl(fd, F_SETFL, flags))  
  141.    {  
  142.        return false;  
  143.    }  
  144.     return true;  
  145. }  
  146. int main()  
  147. {  
  148. int acceptSockfd=0; // Accept the socket returned
  149. int sockfd=0; // Server socket
  150. int nfds=0; // The number of events triggered
  151. socklen_t addrLen; // Address information length
  152. struct sockaddr_in clinetAddr; //IPv4 address structure
  153. if(0! =initThreadPool())
  154.     {  
  155.         perror(“initThreadPool error:”);  
  156.         exit(-1);  
  157.     }  
  158.     sockfd=initListen(IP,PORT,LISTENMAX);  
  159.     ThreaDataPackage.sockNumber[0]=sockfd;  
  160.     if(sockfd<0)  
  161.          {  
  162.              perror(“initListen error:”);  
  163.             exit(-1);  
  164.          }  
  165. ThreaDataPackage.epfd = epoll_create(8); // Generate the file descriptor
  166. ThreaDataPackage.ev.events = EPOLLIN | EPOLLET; // The corresponding file descriptor is readable and is the Epoll working mode of ET
  167.     ThreaDataPackage.ev.data.fd =sockfd ;  
  168. If (epoll_ctl(threadatapackage.epfd, EPOLL_CTL_ADD,sockfd, & Threadatapackage.ev) < 0)// adds the epoll event to the collection
  169.         {  
  170.             perror(“epoll_ctl error:”);  
  171.             exit(-1);  
  172.         }  
  173.     while(1)  
  174.     {  
  175.         nfds = epoll_wait(ThreaDataPackage.epfd , ThreaDataPackage.waitEvent, ThreaDataPackage.MAX+1, -1);  
  176.         printf(“nfds::%d\n”,nfds);  
  177. for(int i=0; i
  178.          {  
  179.           if((sockfd==ThreaDataPackage.waitEvent[i].data.fd)&&(EPOLLIN==ThreaDataPackage.waitEvent[i].events&EPOLLIN))  
  180.             {  
  181.                  addrLen=sizeof(struct sockaddr_in);  
  182.                  bzero(&clinetAddr,addrLen);  
  183. for(int j=0; j
  184.                  {  
  185.                      if(ThreaDataPackage.sockNumber[j]==0)  
  186.                      {  
  187.                          ThreaDataPackage.sockNumber[j]= accept(sockfd, (struct sockaddr *)&clinetAddr, &addrLen);  
  188.                          if(ThreaDataPackage.sockNumber[j]<0)  
  189.                          {  
  190.                              perror(“accept error:”);  
  191.                              continue;  
  192.                          }  
  193.                          else  
  194.                          {  
  195.                             ThreaDataPackage.ev.data.fd = ThreaDataPackage.sockNumber[j];  
  196.                             ThreaDataPackage.ev.events = EPOLLIN|EPOLLET;  
  197. if (epoll_ctl(ThreaDataPackage.epfd , EPOLL_CTL_ADD,ThreaDataPackage.sockNumber[j], &threadatapackage.ev) < 0)// Joins the epoll event collection
  198.                                 {  
  199.                                     perror(“epoll_ctl error:”);  
  200.                                     exit(-1);  
  201.                                 }  
  202. setNonBlock(ThreaDataPackage.sockNumber[j]); // Set it to non-blocking
  203.                              ThreaDataPackage.MAX++;  
  204. Printf (” client %d live \n”, threadatapackage.max);
  205.                              break;  
  206.                          }  
  207.                      }  
  208.                  }  
  209.             }  
  210.            else if(ThreaDataPackage.waitEvent[i].data.fd>3&&( EPOLLIN == ThreaDataPackage.waitEvent[i].events & (EPOLLIN|EPOLLERR)))  
  211.             {  
  212. for(int j=0; j
  213.                   {  
  214.                      if(0==threadParameter[j][0])  
  215.                        {  
  216. threadParameter[j][0]=1; // Set the activity flag to “activity”
  217. threadParameter[j][1]=ThreaDataPackage.waitEvent[i].data.fd; // Client socket
  218.                         increment_count(j);  
  219.                         break;  
  220.                       }  
  221.                    }  
  222.              }  
  223.          }  
  224.     }  
  225.     return 0;  
  226. }  
  • blocking & non-blocking & synchronous & Relationships between asynchrony

\

\

One: blocking and non-blocking

Blocking and non-blocking are concerned with the state of the program while it waits for the result of the call (message, return value).

Blocking and non-blocking are concerned with the state of the program while it waits for the result of the call (message, return value).

A blocking call means that the current thread is suspended until the result of the call is returned. The calling thread does not return until it gets the result. A non-blocking call does not block the current thread until the result is not immediately available.

\

1. BlockingIO:

\

1.1: Blocking call means that the current thread is suspended until the result of the call is returned. The function returns only after the result is obtained. One might equate blocking calls with synchronous calls, but they are different.

\

1.2: For synchronous calls, many times the current thread is still active, but logically the current function does not return. The socket receiving data function recv is an example of a blocking call.

\

1.3: If the socket is in blocking mode and no data is available, the current thread is suspended until data is available.

\

Figure 1 is the flow chart of the RECV function receiving information

When the user process invokes the recV system call, the kernel begins the first stage of IO: preparing data. For Networkio, many times the data does not arrive in the first place and the kernel waits for the data to arrive. On the user side, the entire process is blocked. When the kernel waits until the data is ready, it copies the data from the nic cache to the system cache and then to the user-specified cache. Then the kernel returns the result and the user process unblocks and runs again. So blockingIO has a characteristic that it blocks at both stages of IO execution.

\

2. NonblockingIO (non-blocking) :

\

On Linux, you can set the socket to make it non-blocking.

\

2.1: A non-blocking call is a call that does not block the current thread until the result is not immediately available.

\

2.2: There is a strong correlation between whether an object is in blocking mode and whether a function is blocking, but there is not a one-to-one correspondence. \

2.3; : There can be non-blocking calls on blocking objects. We can poll the state through certain APIS and call blocking functions when appropriate to avoid blocking. For non-blocking objects, special functions can also be called to enter blocking calls. The function SELECT is one such example.

\

\

\

Figure 2 shows how to make the socket non-blocking by setting it. As you can see, when the user process issues a RECV operation, if the data in the kernel is not ready, it does not block the user process, but immediately returns an error. From the user process’s perspective, when it initiates a RECV operation, it does not wait, but gets a result immediately. When the user process determines that the result is an error, it knows that the data is not ready, so it can send the RECV operation again. As soon as the data in the kernel is ready and a recV call from the user process is received again, it copies the data into user memory and returns. So, the user process actually needs to constantly actively ask the kernel if the data is ready.

In fact, this process is essentially the same as blocking, but the need to constantly ask the kernel, there is no copy of the data is the same as blocking. Figure 3 takes select as an example. We put all file descriptors into the collection each time and search through select. If data arrives, the collection keeps the file descriptors. The efficiency of this approach is inversely proportional to the number of users online.

Two synchronous and asynchronous

Synchronization is the relationship between two objects, while blocking is the state of an object.

‘s focus is both synchronous and asynchronous message communication mechanism (synchronouscommunication/asynchronouscommunication) so-called synchronization, was issued a call * *, before didn’t get the result, the * * call will not return. But once the call returns, you get the return value. In other words, the * caller * actively waits for the result of the * call *.

Asynchronous, on the other hand, the call returns directly after it is issued, so no result is returned. In other words, when an asynchronous procedure call is made, the caller does not get the result immediately. Instead, after the * call * is issued, the * called * notifies the caller via status, notifications, or callback functions to handle the call.

3. SynchronousIO:

\

Synchronization is when you call it and you have to get a result before you get back, and you wait until you get a result, and some of you might ask if synchronization is non-blocking, and synchronization is non-blocking and you get a result. Except that this result means that there is no data or error;

The above blocking and non-blocking states are synchronous.

4. AsynchronousIO:

As shown in Figure 5, once the user process initiates the RECV operation, it is ready to do other things. On the other hand, from the kernel’s perspective, when it gets an asynchronousRecv, first it returns immediately, so it doesn’t generate any blocks to the user process. The kernel then waits for the data to be ready and copies it to the user’s memory. When all this is done, the kernel sends a signal to the user process telling it that the RECV operation is complete.

\

There is a clear difference between non-blockingio and asynchronousIO. In non-Blockingio, although the process is not blocked most of the time, it still requires the process to actively check, and when the data is ready, the process must actively call RECV again to copy the data to the user’s memory. AsynchronousIO is completely different. It is as if the user process hands off the entire IO operation to someone else (the kernel), who then signals when it is done. During this time, the user process does not need to check the status of IO operations or actively copy data.

\

\

To sum up: Many people also confuse asynchronous with non-blocking because asynchronous operations generally don’t block at real IO operations, For example, if you use the select function, it will not block if you try to read when the select function returns a readable number, just like when your number is queued, there will be no one else in front of you, so you will not block when you go to the counter. As you can see, synchronous/asynchronous and blocking/non-blocking are two different concepts that can be combined together. Many people confuse synchronous and blocking because they don’t distinguish between the two concepts. For example, the blocking read/write operation combines message notification with processing messages. The message of concern here is whether the FD is readable/writable, while the processing message is read/write to the FD. When we set the fd to non-blocking, the read/write operation will not block waiting for message notification and will return immediately if the FD is not read/write.

\