Section 1 Understanding camera streaming media section 2 Browser without plug-in playback RTSP


Design of camera live broadcast system

  • Step 1: The user operates the camera on the front page.
  • Step 2: Instructions through THE TCP protocol as shownHTTP,MQTT,AMQPSuch as passthrough toCoreService, as takenHTTPEnsure that the protocol mode is transparentCoreThe exposed service address is accessible on the public network if takenMQTT,AMQPProtocol delivery requires only assuranceCoreThe service can access the public networkMQTT,AMQPtheBrokerAddress.
  • Step 3:CoreAs a bridge between the camera and Stream service, the service needs to ensure that the network is unobstructed with the camera (the cameras at the site, home and company are generally in the local area network, soCoreThe service also needs to be deployed on a host on the same LAN as the camera), receiving instructions to execute in the Core serviceFFmpegCommand take stream push stream, capture upload, video upload.
  • Step 4: Stream media data (photos and videos) throughHttpProtocol transfer.
  • Step 5:StreamThe service processes the request sent in step 4 and passesWebsocketwithjsmpeg.jsCommunication, sending streaming media data to the front end.

Stream media transfer service

The Stream service needs to have a controller that receives HTTP requests for streaming media. Then it needs to delegate the bytes of streaming media carried by HTTP requests to the WsHandler class, which holds the WebSocketSession established by the front-end through Jsmepg. js. And send the byte data of the camera through WebSocketSession. The collaboration diagram is as follows:

1. Create Maven project Stream in Intellij and add Spring Boot dependency to POP.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.4. RELEASE</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <version>2.3.4. RELEASE</version>
            <! Class path of main ();
            <configuration>
                <mainClass>ning.zhou.stream.StreamApplication</mainClass>
            </configuration>
            <executions>
                <execution>
                    <goals>
                        <goal>repackage</goal>
                        <goal>build-info</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
Copy the code

2. Write the launch class StreamApplication and integrate WebSocket configuration

@SpringBootApplication
public class StreamApplication {
    public static void main(String[] args) { SpringApplication.run(StreamApplication.class, args); }}Copy the code
@Configuration
@EnableWebSocket
public class WsConfiguration implements WebSocketConfigurer {

    @Autowired
    private WsIntercept wsIntercept;

    @Autowired
    private WsHandler wsHandler;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
        // Set the WEBSocket URI and configure the handler
        webSocketHandlerRegistry.addHandler(wsHandler, "/videoplay")
            	// Configure interceptors and cross-domains
                .addInterceptors(wsIntercept).setAllowedOrigins("*");
    }

    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer(a) {
        // Set the maximum message size to 10M. If the message size is not set to too large, the connection will be disconnected automatically
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(10 * 1024);
        container.setMaxBinaryMessageBufferSize(10 * 1024);
        returncontainer; }}Copy the code
@Component
public class WsIntercept extends HttpSessionHandshakeInterceptor {

    @Override
    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
        // The configuration is compatible with different browsers
        HttpServletRequest request = ((ServletServerHttpRequest) serverHttpRequest).getServletRequest();
        HttpServletResponse response = ((ServletServerHttpResponse) serverHttpResponse).getServletResponse();
        String header = request.getHeader("sec-websocket-protocol");
        if (StringUtils.isNotEmpty(header)) {
            response.addHeader("sec-websocket-protocol",header);
        }
        super.afterHandshake(serverHttpRequest,serverHttpResponse,webSocketHandler,e); }}Copy the code

WsConfiguration, the WsIntercept class is SpringBoot integrated Websocket template configuration code; Readers can concentrate on the specified URI path configuration WebSocketHandler method: webSocketHandlerRegistry. AddHandler (wsHandler, “/ videoplay”).

public class RtspUtils {

    static final int MIN_ARRAY_LEN = 2;
    static final int DIVIDE_INTO_PAIRS = 2;

    public static Map<String, String> parseRequestParam(String url) {
        Map<String, String> map = new HashMap<>();
        if(! url.contains("?")) {
            return null;
        }
        String[] parts = url.split("\ \"?", DIVIDE_INTO_PAIRS);
        if (parts.length < MIN_ARRAY_LEN) {
            return null;
        }
        String parsedStr = parts[1];
        if (parsedStr.contains("&")) {
            String[] multiParamObj = parsedStr.split("&");
            for (String obj : multiParamObj) {
                parseBasicParam(map, obj);
            }
            return map;
        }
        parseBasicParam(map, parsedStr);
        return map;
    }

    private static void parseBasicParam(Map<String, String> map, String str) {
        String[] paramObj = str.split("=");
        if (paramObj.length < MIN_ARRAY_LEN) {
            return;
        }
        map.put(paramObj[0], paramObj[1]); }}Copy the code
@Component
public class WsHandler extends BinaryWebSocketHandler {

    private static final Logger logger = LogManager.getLogger(WsHandler.class);

    /** * Camera ID and corresponding user subscription list */
    private Map<String, CopyOnWriteArrayList<WebSocketSession>> cameraClientsMap = new ConcurrentHashMap<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        Map<String, String> paramMap = RtspUtils.parseRequestParam(session.getUri().toString());
        String cameraId = paramMap.get("id");
        // Thread-safe PUT
        cameraClientsMap.computeIfAbsent(cameraId, k -> new CopyOnWriteArrayList()).add(session);
        logger.info(session.getId() + "User goes online, turns on the camera." + cameraId);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        cameraClientsMap.values().forEach(webSocketSessions -> webSocketSessions.remove(session));
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        cameraClientsMap.values().forEach(webSocketSessions -> webSocketSessions.remove(session));
    }
    
    /** * Send data **@paramData Streaming media data *@paramId Camera ID */
    public void sendVideo(byte[] data, String id) {
        try {
            CopyOnWriteArrayList<WebSocketSession> webSocketSessions = cameraClientsMap.get(id);
            if(webSocketSessions ! =null && webSocketSessions.size() > 0) {
                for (WebSocketSession session : webSocketSessions) {
                    if (session.isOpen()) {
                        Thread.sleep(1);
                        session.sendMessage(newBinaryMessage(data)); }}}}catch (Exception e) {
        }
    }

}
Copy the code

WsHandler inherited and covering BinaryWebSocketHandler afterConnectionEstablished, afterConnectionClosed, handleTransportError method, The WebSocketSession corresponding to the ID of each camera is maintained. WebSocketSession is the client connection of each camera watching, and also the channel through which the server sends data to the client. The sendVideo method defined by us sends the streaming media corresponding to each camera to the client watching the camera.

3. Receives the streaming media request controller

@RestController
public class RtspController {

    @Resource
    private WsHandler wsHandler;

    @PostMapping("/rtsp/receive")
    @ResponseBody
    public void receive(HttpServletRequest request, String id) throws Exception {
        ServletInputStream inputStream = request.getInputStream();
        int len = -1;
        while((len = inputStream.available()) ! = -1) {
            Thread.sleep(1);
            byte[] data = new byte[len];
            inputStream.read(data);
            if (data.length > 0) {
                // Call WsHandler's sendVideo method
                // Send streaming media datawsHandler.sendVideo(data, id); }}}}Copy the code

4. Add jsmpeg.min.js to the resources/static directory and create index.html

<html>
<head>
</head>
<body>
<canvas id="video"></canvas>
<script type="text/javascript" src="jsmpeg.min.js"></script>
<script type="text/javascript">
    var canvas = document.getElementById('video');
    var url = 'the ws: / / 127.0.0.1:8080 / videoplay? id=1';
    var player = new JSMpeg.Player(url, {canvas: canvas});
</script>
</body>
</html>
Copy the code

Please according to need to modify the url address, which is suitable for the above videoplay WsConfiguration. RegisterWebSocketHandlers method websocket configuration in the uri address, id, the id for the camera Adjust this parameter to preview video streams from different cameras.

Testing the Stream service

1. Run the main method of StreamApplication. Java

2. In the DOS window, execute the FFMPEG instruction push stream. You can refer to the instruction mentioned in section 1 to replace the parameter

D:\ ffmPEG \bin\ ffmPEG-hwaccel auto-rtSP_transport TCP -I RTSP address -f mPEGts-codec :v mPEG1video-bf 0-codec :a mp2-r 25 -b:v 1000K -S 960x520-an Trunk HTTP
Copy the code
  • 2.1rtspAddress: for the camerartspPlay address, here I use homeFluorite cloudcamerartspAddress.
  • 2.2 relayhttp: In this spaceStreamserviceRtspController.receivemethodsrestNote that the address must include the camera ID parameter, inindex.htmlWhat we’re playing is a cameraidIs 1, so the HTTP relay address ishttp://127.0.0.1:8080/rtsp/receive?id=1 。
  • -rParameters of the correspondingfps.-b:vVideo stream frame rate, –sResolution,-anClose the audio

3. The browser to http://127.0.0.1:8080/index.html, the browser preview the camera video success.

Extension & Thinking

In this section, we have eliminated websocket-relay.js for Stream services, but simply fetching a Stream by executing Cmd commands in a Dos window is not acceptable for Javaer. There are other issues, too. We need to ensure the retry mechanism of Cmd commands. Because objective reasons FFmpeg codec abnormal Cmd command will report an error stop; In the following chapters, we will overcome these problems step by step in Core service to build a highly available camera monitoring system.