In RTMP (1), we have already set up a server to push live streams and play live streams using some existing tools. In this article we set out to build a RTMP push stream pull stream SDK from scratch, to achieve a real “deep understanding” of the protocol implementation.

As a coder, moving bricks to a certain height requires scaffolding to hold us up. To this end, we have implemented the RTMP push and pull stream SDK as a command line program on PC. When the development and debugging is stable, we can quickly compile to Android/iOS and other mobile devices through cross-compilation tools.

1. Create a project

Brew install cmake. Brew install cmake. Create cmakelists.txt under our RTMPSDK path:

// Specify cmake minimum VERSION cmake_minimum_required (VERSION 3.6)set(CMAKE_INSTALL_PREFIX "${CMAKE_BINARY_DIR}" CACHE PATH "Installation directory" FORCE)
message(STATUS "CMAKE_INSTALL_PREFIX=${CMAKE_INSTALL_PREFIX}")


set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -ffunction-sections -fdata-sections -Os")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -ffunction-sections -fdata-sections -Os")

project (rtmpsdk)

set(SRC_PREFIX "src")

set(SELF_LIBS_OUT ${CMAKE_SYSTEM_NAME}.out)

file(GLOB SELF_SRC_FILES 
    ${SRC_PREFIX}/main.cpp
    )

add_executable(${PROJECT_NAME} ${SELF_SRC_FILES})
Copy the code

Create SRC directory, create main.cpp file:

#include <iostream>Int main(int argc,char* argv[]) {STD ::cout <<"Hello rtmp server!" << std::endl;
    return 0;    
}
Copy the code

Create the cmake_build folder under RTMPSDK as our output path in the console, we enter our project path after executing:

cd cmake_build
Copy the code

Then execute:

cmake ..
make 
Copy the code

The build intermediate file and the final RTMPSDK file are generated under Camke:

./rtmpsdk

$ ./rtmpsdk 
Hello rtmp server!
Copy the code

You can see that we printed “Hello RTMP server!” The build environment is now set up and we can continue to implement our functionality.

Note: my development environment is MAC, Windows environment behind I provide a docker centos image as our project compilation environment.

2. Encapsulate interfaces

What interface should our RTMP provide? What data structures are encapsulated?

  1. We want to connect to our server, RTMP is based on TCP, so we want to create a socket network socket, so we need an interface to create objects based on the URLrtmp_t rtmp_create(const char* url)
  2. After creating the socket, we also need to do some configuration. The most basic configuration is to configure the read/write timeout time. If our socket does not time out, our read/write function does not return, which will cause the problem of exit, so we need to provide an interface to set the read/write timeout:int rtmp_set_timeout(rtmp_t rtmp, int recv_timeout_ms, int send_timeout_ms)
  3. RTMP has a handshake process, which then requires a handshake interface:int rtmp_handshake(rtmp_t rtmp)
  4. After the handshake succeeds, connect to the server and provide the following interface:int rtmp_connect_app(rtmp_t rtmp)
  5. Notifies the server of whether to pull or push a stream after a successful connection, providing two functions:int rtmp_play_stream(rtmp_t rtmp).int rtmp_publish_stream(rtmp_t rtmp)
  6. You can start pulling or pushing:int rtmp_read_packet(rtmp_t rtmp, char* type, uint32_t* timestamp, char** data, int* size).int rtmp_write_packet(rtmp_t rtmp, char type, uint32_t timestamp, char* data, int size)
  7. After the pull-push flow ends, the object is destroyed to release resources:void rtmp_destroy(rtmp_t rtmp)

Take playing as an example and use a graph:


#ifndef LIB_RTMP_HPP
#define LIB_RTMP_HPP

/**
 * rtmpsdk is a librtmp like library,
 * used to play/publish rtmp stream from/to rtmp server.
 * socket: use sync and block socket to connect/recv/send data with server.
 * depends: no need other libraries; depends on ssl if use complex_handshake.
 * thread-safe: no
 */

#ifndef __STDC_FORMAT_MACROS
    #define __STDC_FORMAT_MACROS
#endif

#include <stdint.h>
#include <sys/types.h>

#ifdef __cplusplus
extern "C"{
#endif/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * ************************************************************** * RTMP protocol context * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *************************************************************/ // the RTMP handler. typedef void* rtmp_t; /** * Create a RTMP handler. * @param url The RTMP url,for example, rtmp://localhost/live/livestream
 * @remark default timeout to 30s if not set by rtmp_set_timeout.
 * @remark default schema to url_schema_normal, use rtmp_set_schema to change it.
 *
 * @return a rtmp handler, or NULL iferror occured. */ extern rtmp_t rtmp_create(const char* url); / * * *set socket timeout
 * @param recv_timeout_ms the timeout for receiving messages in ms.
 * @param send_timeout_ms the timeout for sending message in ms.
 * @remark user can set timeout once rtmp_create,
 *      or before rtmp_handshake or rtmp_dns_resolve to connect to server.
 * @remark default timeout to 30s if not set by rtmp_set_timeout.
 *
 * @return0, success; otherswise, failed. */ extern int rtmp_set_timeout(rtmp_t rtmp, int recv_timeout_ms, int send_timeout_ms); /** * close and destroy the rtmp stack. * @remark, user should never use the rtmp again. */ extern void rtmp_destroy(rtmp_t rtmp); / * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * ************************************************************** * RTMP protocol stack * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *************************************************************/ /** * connect and handshake with server * category: publish/play * previous: rtmp-create * next: connect-app * * @return 0, success; otherswise, failed.
 */
/**
 * simple handshake specifies inRTMP 1.0, * not determined on SSL. */ ** * rtmp_handshake equals to invoke: * rtmp_dns_resolve() * rtmp_connect_server() * rtmp_do_simple_handshake() * user can use thesefunctions if needed.
 */
extern int rtmp_handshake(rtmp_t rtmp);


/**
 * Connect to RTMP tcUrl(Vhost/App), similar to flash AS3 NetConnection.connect(tcUrl).
 * @remark When connected to server, user can retrieve informations from RTMP handler,
 *      for example, use rtmp_get_server_id to get server ip/pid/cid.
 * @return 0, success; otherswise, failed.
 */
extern int rtmp_connect_app(rtmp_t rtmp);


/**
 * play a live/vod stream.
 * category: play
 * previous: connect-app
 * next: destroy
 * @return 0, success; otherwise, failed.
 */
extern int rtmp_play_stream(rtmp_t rtmp);

/**
 * publish a live stream.
 * category: publish
 * previous: connect-app
 * next: destroy
 * @return0, success; otherwise, failed. */ extern int rtmp_publish_stream(rtmp_t rtmp); /** * e.4.1 FLV Tag, Page 75 */ / 8 = audio#define RTMP_TYPE_AUDIO 8
// 9 = video
#define RTMP_TYPE_VIDEO 9
// 18 = script data
#define RTMP_TYPE_SCRIPT 18/ * * *read a audio/video/script-data packet from rtmp stream.
 * @param type, output the packet type, macros:
 *            RTMP_TYPE_AUDIO, FlvTagAudio
 *            RTMP_TYPE_VIDEO, FlvTagVideo
 *            RTMP_TYPE_SCRIPT, FlvTagScript
 *            otherswise, invalid type.
 * @param timestamp, in ms, overflow in 50days
 * @param data, the packet data, according to type:
 *             FlvTagAudio, @see "E. 2 AUDIODATA"
 *            FlvTagVideo, @see "E. 4.3.1 VIDEODATA." "
 *            FlvTagScript, @see "E. 4.4.1 SCRIPTDATA"
 *            User can free the packet by rtmp_free_packet.
 * @param size, size of packet.
 * @return the error code. 0 for success; otherwise, error.
 *
 * @remark: for read, user must free the data.
 * @remark: for write, user should never free the data, even if error.
 *
 * @return 0, success; otherswise, failed.
 */
extern int rtmp_read_packet(rtmp_t rtmp, char* type, uint32_t* timestamp, char** data, int* size);
// @param data User should never free it anymore.
extern int rtmp_write_packet(rtmp_t rtmp, char type, uint32_t timestamp, char* data, int size);


#ifdef __cplusplus
}
#endif

#endif

Copy the code

Now that the interface is defined, we can implement the interface step by step. Let’s implement the first step rtmp_create, which creates the socket through the URL.

3. Encapsulate network interfaces

Before encapsulating the network interface, let’s do a review of Linux C network programming

3.1 Basic Process of Programming Linux C Socket

Let’s start with a picture:

Our RTMPSDK as a TCP client, let’s take a look at Linux C about the SOCKET API

3.1.1 socket ()

The function prototype
int socket(int domain, int type, int protocol);
Copy the code
Parameters that
  • Domain: indicates the protocol domain. It is also called the protocol family. Common protocol families include AF_INET, AF_INET6, AF_LOCAL (or AF_UNIX, Unix domain Socket), and AF_ROUTE. The protocol family determines the address type of the socket, and the corresponding address must be used in communication. For example, AF_INET determines the combination of ipv4 address (32-bit) and port number (16-bit), and AF_UNIX determines an absolute pathname as the address.
  • Type: indicates the Socket type. Common socket types include SOCK_STREAM, SOCK_DGRAM, SOCK_RAW, SOCK_PACKET, and SOCK_SEQPACKET. A stream Socket (SOCK_STREAM) is a connection-oriented Socket for connection-oriented TCP service applications. A datagram Socket (SOCK_DGRAM) is a connectionless Socket corresponding to a connectionless UDP service application.
  • Protocol: specifies a protocol. Common protocols include IPPROTO_TCP, IPPROTO_UDP, IPPROTO_STCP, and IPPROTO_TIPC, which correspond to TCP transport protocol, UDP transport protocol, STCP transport protocol, and TIPC transport protocol respectively.

Note: 1. Type and protocol can not be combined at will. For example, SOCK_STREAM can not be combined with IPPROTO_UDP. When the third parameter is 0, the system automatically selects the default protocol corresponding to the second parameter type.

The return value

Returns the descriptor for the newly created socket on success or INVALID_SOCKET on failure (-1 on Linux failure).

The socket descriptor is a value of type integer. Each process has a socket descriptor table in the process space, which holds the mapping between the socket descriptor and the socket data structure. The table has one field for the descriptor of the newly created socket and another field for the address of the socket data structure, so you can find the corresponding socket data structure based on the socket descriptor. Each process has a socket descriptor table in its own process space, but the socket data structure is in the operating system’s kernel buffer.

3.1.2 the bind ()

The bind() function assigns a specific address in an address family to the socket. For example, AF_INET and AF_INET6 assign an ipv4 or ipv6 address and port number to the socket.

The function prototype
int bind(int socketfd, const struct sockaddr *addr, socklen_t addrlen);
Copy the code
Parameters that
  • Socketfd: A descriptor that identifies the connected socket.
  • Address: is a pointer to the SockADDR structure that contains the address and port number to be combined.
  • Address_len: Determines the length of the address buffer.

The address structure of sockaddr varies according to the address protocol family used to create the socket.

For example, ipv4:

struct sockaddr_in {
    sa_family_t    sin_family; /* address family: AF_INET */
    in_port_t      sin_port;   /* port in network byte order */
    struct in_addr sin_addr;   /* internet address */
};
/* Internet address. */
struct in_addr {
	uint32_t       s_addr;     /* address in network byte order */
};
Copy the code

Ipv6 corresponds to:

struct sockaddr_in6 { 
    sa_family_t     sin6_family;   /* AF_INET6 */ 
    in_port_t       sin6_port;     /* port number */ 
    uint32_t        sin6_flowinfo; /* IPv6 flow information */ 
    struct in6_addr sin6_addr;     /* IPv6 address */ 
	uint32_t        sin6_scope_id; /* Scope ID (new in 2.4) */ 
};

struct in6_addr { 
	unsigned char   s6_addr[16];   /* IPv6 address */ 
};
Copy the code

The Unix domain corresponds to:

#define UNIX_PATH_MAX 108
struct sockaddr_un { 
    sa_family_t sun_family;               /* AF_UNIX */ 
    char        sun_path[UNIX_PATH_MAX];  /* pathname */ 
};
Copy the code
The return value

The return value is 0 if the function executed successfully, otherwise SOCKET_ERROR.

3.1.3 listen ()

If a server calls socket() and bind(), it calls listen() to listen to the socket, and if the client calls connect() to make a connection request, the server receives the request.

The function prototype
int listen(int socketfd, int backlog);
Copy the code
Parameters that
  • Socketfd: indicates the description of the socket to listen on.
  • Backlog: Indicates the maximum number of connections that the corresponding socket can queue.

The socket() function creates a socket of an active type by default. The Listen function changes the socket to a passive type, waiting for a connection request.

3.1.4 the connect ()

The function prototype
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
Copy the code
Parameters that
  • Socketfd: indicates the description of the client socket.
  • Sockaddr: indicates the socket address of the server.
  • Addrlen: indicates the length of the socket address

3.1.5. The accept ()

After the TCP server calls socket(), bind(), and listen(), it listens for the specified socket address. The TCP client sends a connection request to the TCP server after calling socket() and connect() in sequence. After the TCP server listens for the request, it calls the accept() function to accept the request and the connection is established. Then network I/O operations can begin, that is, read and write I/O operations similar to normal files.

The function prototype
int accept(int socketfd, struct sockaddr *addr, socklen_t *addrlen); // Return the connection connect_fdCopy the code
Parameters that
  • Socketfd: is the listening socket explained above. This socket listens for a port that is associated with the socket when a client is connected to the server. Of course the client does not know the details of the socket, it only knows an address and a port number.
  • Sockaddr: The result parameter, which accepts a return value specifying the address of the client specified by an address structure that the user should know about. If you are not interested in the customer’s address, you can set this value to NULL.
  • Len: This is also an argument to the result, which accepts the size of the structure of the addr mentioned above. It specifies the number of bytes that the addr structure occupies. Again, it can be set to NULL.

If accept returns successfully, the server and the client have been properly connected, and the server communicates with the client through the socket returned by Accept.

Accept blocks by default until a client connection is established and returns a newly available socket, which is a connection socket.

  • Listen socket: Listen socket like the accept parameter sockfd, this is a listen socket generated after the server starts calling the socket() function. This is called the listen socket description.
  • Connect socket: A socket changes from an active connect socket to a listening socket. The Accept function returns the connected socket descriptor (a connection socket), which represents a dot connection that already exists on the network.

A server usually only creates a listening socket descriptor that lasts for the life of the server. The kernel creates a connected socket descriptor for each client connection accepted by the server process. When the server finishes serving a client, the corresponding connected socket descriptor is closed.

The connection socket socketfd_new does not use the new port to communicate with the client, using the same port number as the listening socket socketfd

3.1.6. Read (), write(), etc

When the server and client have established a good connection, you can call the network I/O to read and write operations, that is, to achieve the communication between different processes in the network role! Network I/O operations have the following groups:

read()/write()
recv()/send()
readv()/writev()
recvmsg()/sendmsg()
recvfrom()/sendto()
Copy the code
Function Prototype 1
int recv(SOCKET socket, char FAR* buf, int len, int flags);
Copy the code
Parameter Description 1
  • Socket: A description that identifies the connected socket.
  • Buf: Buffer used to receive data.
  • Len: Buffer length.
  • Flags: specifies the invocation mode. Value: MSG_PEEK Views the current data. The data will be copied to the buffer but not deleted from the input queue. MSG_OOB processes out-of-band data. If no errors occur, recv() returns the number of bytes read in. If the connection has been terminated, return 0. Otherwise, a SOCKET_ERROR error is returned, which the application can retrieve from WSAGetLastError().
Function Prototype 2
ssize_t recvfrom(int sockfd, void buf, int len, unsigned int flags, struct socketaddr* from, socket_t* fromlen);
Copy the code
Parameter Description 2
  • Sockfd: Describes a connected socket.
  • Buf: buffer for receiving data.
  • Len: Buffer length.
  • Flags: invoke operation mode. Is a combination of one or more of the following flags, which can be linked together by an OR operation:
  • MSG_DONTWAIT: Operations are not blocked;
  • MSG_ERRQUEUE: Indicates that the error value should be received from the socket’s error queue. Depending on the protocol, the error value should be passed in some sort of auxiliary message. The user should provide a large enough buffer. The original packet that caused the error is passed through MSG_iovec as plain data. The original destination address of the datagram causing the error is supplied as msg_name. Errors are used as sock_extended_ERR constructs.
  • MSG_PEEK: Indicates that after the data is received, the original data is kept in the receive queue, not deleted, and subsequent reads can receive the same data.
  • MSG_TRUNC: Returns the actual length of the packet, even if it is longer than the supplied buffer, only for packet sockets.
  • MSG_WAITALL: Requires that the operation be blocked until the request is fully satisfied. However, if a signal is caught, an error or a disconnection occurs, or if the type of data received next time is different, less data will still be returned.
  • MSG_EOR: Indicates the end of a record, and the data returned completes a record.
  • MSG_TRUNC: Indicates that the datagram tail data has been discarded because it requires more space than the supplied buffer.
  • MSG_CTRUNC: Indicates that some control data has been discarded due to insufficient buffer space. (MSG_TRUNC incorrectly used,4 is the correct interpretation of MSG_TRUNC)
  • MSG_OOB: Indicates that out-of-band data has been received (that is, data that needs to be processed first).
  • MSG_ERRQUEUE: Indicates that no data has been received except for errors from the socket error queue.
  • From :(optional) pointer to a buffer containing the source address.
  • Fromlen :(optional) pointer to the from buffer length value.
Function Prototype 3
int sendto( SOCKET s, const char FAR* buf, int size, int flags, const struct sockaddr FAR* to, int tolen);
Copy the code
Parameter Description 3
  • S: socket
  • Buf: buffer for sending data
  • Size: indicates the buffer length
  • Flags: Call mode flag, usually 0. Changing flags will change Sendto
  • Addr: (Optional) Pointer to the address of the destination socket
  • Tolen: ADDR Length of the address referred to returns the number of bytes sent on success, or SOCKET_ERROR on failure.
Function Prototype 4
int accept( int fd, struct socketaddr* addr, socklen_t* len);
Copy the code
Parameter Description 4
  • Fd: indicates the socket descriptor.
  • Addr: Returns the address of the connection
  • Len: The length of the buffer that received the return address is returned. The client file descriptor is returned on success, -1 on failure.

3.1.7. Close ()

After the connection between the server and the client is established, some read and write operations will be performed. After the read and write operations are completed, the corresponding socket description must be closed.

The function prototype
int close(int fd);
Copy the code

Close the default behavior of a TCP socket, marking the socket as closed, and immediately returning to the calling process. This descriptor can no longer be used by the calling process, that is, as the first argument to read or write.

Note: The close operation only causes the reference count of the corresponding socket description to be -1, and only when the reference count is 0 triggers the TCP client to send a termination request to the server.

3.2 packaging socket

We encapsulate the socket and timeout configuration into a structure:

struct BlockSyncSocket
{
    SOCKET fd;
    int    family;
    int64_t rbytes;
    int64_t sbytes;
    // The send/recv timeout in ms.
    int64_t rtm;
    int64_t stm;
    
    BlockSyncSocket() {
        stm = rtm = UTIME_NO_TIMEOUT;
        rbytes = sbytes = 0;
        
        SOCKET_RESET(fd);
        SOCKET_SETUP();
    }

    virtual ~BlockSyncSocket() { SOCKET_CLOSE(fd); SOCKET_CLEANUP(); }};Copy the code

From the above analysis, we need to design socket creation, connection, read and write, set timeout, etc. :

/**
 * simple socket stream,
 * use tcp socket, sync block mode
 */
class SimpleSocketStream
{
private:
    BlockSyncSocket* io;
public:
    SimpleSocketStream();
    virtual ~SimpleSocketStream();
public:
    virtual BlockSyncSocket* hijack_io();
    virtual int create_socket(std::string url);
    virtual int connect(const char* server, int port);

public:
    virtual error_t read(void* buf, size_t size, ssize_t* nread);

public:
    virtual void set_recv_timeout(utime_t tm);
    virtual utime_t get_recv_timeout();
    virtual int64_t get_recv_bytes();
public:
    virtual void set_send_timeout(utime_t tm);
    virtual utime_t get_send_timeout();
    virtual int64_t get_send_bytes();
    virtual error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
public:
    virtual error_t read_fully(void* buf, size_t size, ssize_t* nread);
    virtual error_t write(void* buf, size_t size, ssize_t* nwrite);
};
Copy the code

Next we implement the network encapsulation interface:

#include <netinet/tcp.h>

#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/uio.h>


#include <sys/types.h>
#include <errno.h>
#include <stdio.h>
#include <netdb.h>

#include <bs_socket.hpp>




BlockSyncSocket* hijack_io_create()
{
    BlockSyncSocket* skt = new BlockSyncSocket();
    return skt;
}
void hijack_io_destroy(BlockSyncSocket* ctx)
{
    freep(ctx);
}
int hijack_io_create_socket(BlockSyncSocket* skt,std::string url)
{
    skt->family = AF_INET6;
    skt->fd = ::socket(skt->family, SOCK_STREAM, 0);   // Try IPv6 first.
    if(! SOCKET_VALID(skt->fd)) { skt->family = AF_INET; skt->fd = ::socket(skt->family, SOCK_STREAM, 0); // Try IPv4 instead,if IPv6 fails.
    }
    if(! SOCKET_VALID(skt->fd)) {return ERROR_SOCKET_CREATE;
    }

    // No TCP cache.
    int v = 1;
    setsockopt(skt->fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));

    return ERROR_SUCCESS;
}
int hijack_io_connect(BlockSyncSocket* skt, const char* server_ip, int port)
{
    char sport[8];
    snprintf(sport, sizeof(sport), "%d", port);
    
    addrinfo hints;
    memset(&hints, 0, sizeof(hints));
    hints.ai_family   = skt->family;
    hints.ai_socktype = SOCK_STREAM;
    
    addrinfo* r  = NULL;
    AutoFree(addrinfo, r);
    if(getaddrinfo(server_ip, sport, (const addrinfo*)&hints, &r)) {
        return ERROR_SOCKET_CONNECT;
    }
    
    if(::connect(skt->fd, r->ai_addr, r->ai_addrlen) < 0){
        return ERROR_SOCKET_CONNECT;
    }
    
    return ERROR_SUCCESS;
}
int hijack_io_read(BlockSyncSocket* skt, void* buf, size_t size, ssize_t* nread)
{
    int ret = ERROR_SUCCESS;
    
    ssize_t nb_read = ::recv(skt->fd, (char*)buf, size, 0);
    
    if (nread) {
        *nread = nb_read;
    }

    // On success a non-negative integer indicating the number of bytes actually read is returned
    // (a value of 0 means the network connection is closed or end of file is reached).
    if (nb_read <= 0) {
        if (nb_read < 0 && SOCKET_ERRNO() == SOCKET_ETIME) {
            return ERROR_SOCKET_TIMEOUT;
        }
        
        if (nb_read == 0) {
            errno = SOCKET_ECONNRESET;
        }
        
        return ERROR_SOCKET_READ;
    }
    
    skt->rbytes += nb_read;
    
    return ret;
}
int hijack_io_set_recv_timeout(BlockSyncSocket* skt, int64_t tm)
{

    // The default for this option is zero,
    // which indicates that a receive operation shall not time out.
    int32_t sec = 0;
    int32_t usec = 0;
    
    if(tm ! = UTIME_NO_TIMEOUT) { sec = (int32_t)(tm / 1000); usec = (int32_t)((tm % 1000)*1000); } struct timeval tv = { sec , usec };if (setsockopt(skt->fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) {
        return SOCKET_ERRNO();
    }

    skt->rtm = tm;

    return ERROR_SUCCESS;
}

int hijack_io_set_send_timeout(BlockSyncSocket* skt, int64_t tm)
{

    // The default for this option is zero,
    // which indicates that a receive operation shall not time out.
    int32_t sec = 0;
    int32_t usec = 0;

    if(tm ! = UTIME_NO_TIMEOUT) { sec = (int32_t)(tm / 1000); usec = (int32_t)((tm % 1000)*1000); } struct timeval tv = { sec , usec };if (setsockopt(skt->fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1) {
        return SOCKET_ERRNO();
    }
    
    skt->stm = tm;
    
    returnERROR_SUCCESS; } int hijack_io_writev(BlockSyncSocket* skt, const iovec *iov, int iov_size, ssize_t* nwrite) { int ret = ERROR_SUCCESS;  ssize_t nb_write = ::writev(skt->fd, iov, iov_size);if (nwrite) {
        *nwrite = nb_write;
    }
    
    // On  success,  the  readv()  function  returns the number of bytes read;
    // the writev() function returns the number of bytes written.  On error, -1 is
    // returned, and errno is set appropriately.
    if (nb_write <= 0) {
        if (nb_write < 0 && SOCKET_ERRNO() == SOCKET_ETIME) {
            return ERROR_SOCKET_TIMEOUT;
        }
        
        return ERROR_SOCKET_WRITE;
    }

    skt->sbytes += nb_write;
    
    return ret;
}

int hijack_io_read_fully(BlockSyncSocket* skt, void* buf, size_t size, ssize_t* nread)
{
    int ret = ERROR_SUCCESS;
    
    size_t left = size;
    ssize_t nb_read = 0;
    
    while (left > 0) {
        char* this_buf = (char*)buf + nb_read;
        ssize_t this_nread;
        
        if((ret = hijack_io_read(skt, this_buf, left, &this_nread)) ! = ERROR_SUCCESS) {return ret;
        }
        
        nb_read += this_nread;
        left -= (size_t)this_nread;
    }
    
    if (nread) {
        *nread = nb_read;
    }
    skt->rbytes += nb_read;
    
    return ret;
}
int hijack_io_write(BlockSyncSocket* skt, void* buf, size_t size, ssize_t* nwrite)
{
    
    int ret = ERROR_SUCCESS;
    
    ssize_t nb_write = ::send(skt->fd, (char*)buf, size, 0);
    
    if (nwrite) {
        *nwrite = nb_write;
    }
    
    if (nb_write <= 0) {
        if (nb_write < 0 && SOCKET_ERRNO() == SOCKET_ETIME) {
            return ERROR_SOCKET_TIMEOUT;
        }
        
        return ERROR_SOCKET_WRITE;
    }
    
    skt->sbytes += nb_write;
    
    return ret;
}


error_t SimpleSocketStream::read(void* buf, size_t size, ssize_t* nread)
{
    assert(io);
    int ret = hijack_io_read(io, buf, size, nread);
    if(ret ! = ERROR_SUCCESS) {return error_new(ret, "read");
    }
    return success;
}

Copy the code

We can then create SimpleSocketStream in our main function and then create the socket. Next we’ll start the RTMP handshake with the socket we created.

3.3 test

In our main.cpp:

#include <iostream>
#include <bs_socket.hpp>
int main(int argc,char* argv[])
{
    std::cout << "Hello rtmp server!" << std::endl;
    SimpleSocketStream *sss = new SimpleSocketStream();
    if(sss->create_socket("RTMP: / / 127.0.0.1:1935 / live/livestream")! = 0) {printf("create socket error!");
        return- 1; } std::cout<<"create fd = " << sss->hijack_io()->fd << std::endl;
    free(sss);
    return 0;    
}
Copy the code

Output result:

$ ./rtmpsdk 
Hello rtmp server!
create fd = 3
Copy the code

We successfully created the socket with handle 3.

digression

Linux network programming has synchronous/asynchronous, blocking/non-blocking, since our SDK is the client, there is no concurrent connection problem, so our implementation uses blocking synchronous socket. We created the socket with ipv6 compatibility, try ipv6 first, if failed, try ipv4:

int hijack_io_create_socket(BlockSyncSocket* skt,std::string url)
{
    skt->family = AF_INET6;
    skt->fd = ::socket(skt->family, SOCK_STREAM, 0);   // Try IPv6 first.
    if(! SOCKET_VALID(skt->fd)) { skt->family = AF_INET; skt->fd = ::socket(skt->family, SOCK_STREAM, 0); // Try IPv4 instead,if IPv6 fails.
    }
    if(! SOCKET_VALID(skt->fd)) {return ERROR_SOCKET_CREATE;
    }

    // No TCP cache.
    int v = 1;
    setsockopt(skt->fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));

    return ERROR_SUCCESS;
}
Copy the code

Setsockopt can be used to set the socket, where :IPPROTO_TCP and IPPROTO_IP represent two different protocols, respectively representing the IP protocol family TCP protocol and IP protocol TCP_NODELAY. In TCP/IP, Nagle algorithm is enabled for TCP by default. The Nagle algorithm optimizes the network by reducing the number of packets that need to be transmitted. In kernel implementation, packet sending and receiving are cached first, corresponding to write cache and read cache respectively.

Enabling TCP_NODELAY means that the Nagle algorithm is disabled, allowing packets to be sent. For delay-sensitive applications with a small amount of data transfer, the TCP_NODELAY option is a correct choice. RTMP is live streaming transmission, which is sensitive to delay, so we turn off NODELAY. At the same time, for example, for SSH sessions, the speed at which the user can type commands remotely is not on the same order of magnitude as the network bandwidth capacity, so there is very little data transfer; The input of the user is required to be returned in time with a low delay. If Nagle’s algorithm is enabled, frequent delays are likely to occur, resulting in a poor user experience. Of course, you can also buffer at the application layer, such as using a Buffered Stream in Java to write as many large packets as possible to the kernel’s write cache. Vectored I/O (WriteV interface) is also a good option.

To turn off TCP_NODELAY, Nagle algorithm is applied. Data is sent only after a certain amount of data is accumulated in the write cache. This improves network utilization (the ratio of actual transmitted data payload to protocol headers is greatly increased). But this inevitably adds to the delay; Combined with the TCP delayed ack feature, this problem becomes even more significant, with a delay of approximately 40ms. Of course, this problem only becomes apparent when two writes are performed in a row.

It is not a good network programming mode to write and then read small packets for many times. It should be optimized at the application level. For both low latency requirements, and a large number of small data transmission, but also want to improve the network utilization of the application, probably can only use UDP itself in the application layer to ensure reliability.