UDP broadcast

Connection-oriented transport (such as TCP) manages the establishment of a connection between two network endpoints, the orderly and reliable transmission of messages throughout the life cycle of the connection, and finally, the orderly termination of the connection. In contrast, connectionless protocols like UDP do not have the concept of persistent connections, and UDP does not have the error correction mechanism of TCP. But UDP performs much better than TCP and is suitable for applications that can handle or tolerate message loss

All of our examples so far have used a transport pattern called unicast, defined as sending messages to a single network destination identified by a unique address. This pattern is supported by both connection-oriented and connectionless protocols

UDP provides additional transport modes for sending messages to multiple receivers:

  • Multicast: Transmission to a predefined host group
  • Broadcast: transmission to all hosts on the network (subnet)

The examples in this chapter demonstrate the use of UDP broadcasting by sending messages that can be received by all hosts on the same network

UDP sample application

Our program will open a file and then broadcast each line as a message to a specified port via UDP. The recipient can create an event monitor to receive messages by simply starting a listener on the specified port. This example uses the log file handler as an example

1. Message POJO: LogEvent

In this application, we will treat the message as an event, and since the data comes from a log file, we will call it LogEvent

public class LogEvent {

    public static final byte SEPARATOR = (byte) ':';
    private final InetSocketAddress source;
    private final String logfile;
    private final String msg;
    private final long received;

    public LogEvent(String logfile, String msg) {
        this(null, logfile, msg, -1);
    }

    public LogEvent(InetSocketAddress source, String logfile, String msg, long received) {
        this.source = source;
        this.logfile = logfile;
        this.msg = msg;
        this.received = received;
    }

    public InetSocketAddress getSource(a) {
        return source;
    }

    public String getLogfile(a) {
        return logfile;
    }

    public String getMsg(a) {
        return msg;
    }

    public long getReceived(a) {
        returnreceived; }}Copy the code

2. Programmers

Netty’s DatagramPacket is a simple message container, DatagramChannel and remote node communication, to convert LogEvent message into DatagramPacket, we need an encoder

The following is the code implementation of the encoder

public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {

    private final InetSocketAddress remoteAddress;

    public LogEventEncoder(InetSocketAddress remoteAddress) {
        this.remoteAddress = remoteAddress;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, LogEvent logEvent, List<Object> out) throws Exception {
        byte[] file = logEvent.getLogfile().getBytes(StandardCharsets.UTF_8);
        byte[] msg = logEvent.getMsg().getBytes(StandardCharsets.UTF_8);
        ByteBuf buf = ctx.alloc().buffer(file.length + msg.length + 1);
        buf.writeBytes(file);
        buf.writeByte(LogEvent.SEPARATOR);
        buf.writeBytes(msg);
        out.add(newDatagramPacket(buf, remoteAddress)); }}Copy the code

Next prepare to bootstrap the server, including setting up ChannelOption and installing the required channelbroadcaster in the ChannelPipeline, which is done through the main class LogEventBroadcaster

public class LogEventBroadcaster {

    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    private final File file;

    public LogEventBroadcaster(InetSocketAddress address, File file) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group).channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new LogEventEncoder(address));
        this.file = file;
    }

    public void run(a) throws Exception {
        / / bind Channel
        Channel ch = bootstrap.bind(0).sync().channel();
        long pointer = 0;
        for(; ;) {long len = file.length();
            if (len < pointer) {
                // The file pointer points to the last byte of the file
                pointer = len;
            } else if (len > pointer) {
                RandomAccessFile raf = new RandomAccessFile(file, "r");
                // Sets the current file pointer
                raf.seek(pointer);
                String line;
                while((line = raf.readLine()) ! =null) {
                    ch.writeAndFlush(new LogEvent(null, line, file.getAbsolutePath(), -1));
                }
                pointer = raf.getFilePointer();
                raf.close();
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.interrupted();
                break; }}}public void stop(a) {
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception {
        if(args.length ! =2) {
            throw new InterruptedException();
        }
        LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress("255.255.255.255." ",
                Integer.parseInt(args[0)),new File(args[1]));
        try {
            broadcaster.run();
        }
        finally{ broadcaster.stop(); }}}Copy the code

3. Write monitors

Write a consumer program called LogEventMonitor, which includes:

  • Receive UDP DatagramPacket broadcast by LogEventBroadcaster
  • Decoded to LogEvent message
  • Process LogEvent messages

As before, the decoder LogEventDecoder is responsible for decoding the incoming DatagramPacket into LogEvent messages

public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {

    @Override
    protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
        ByteBuf data = datagramPacket.content();
        int idx = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR);
        String filename = data.slice(0, idx).toString(CharsetUtil.UTF_8);
        String logMsg = data.slice(idx + 1, data.readableBytes()).toString(CharsetUtil.UTF_8);
        LogEvent event = newLogEvent(datagramPacket.sender(), logMsg, filename, System.currentTimeMillis()); out.add(event); }}Copy the code

Create a ChannelHandler to handle logEvents

public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, LogEvent event) throws Exception {
        StringBuilder builder = new StringBuilder();
        builder.append(event.getReceived());
        builder.append("[");
        builder.append(event.getSource().toString());
        builder.append("] [");
        builder.append(event.getLogfile());
        builder.append("]."); builder.append(event.getMsg()); System.out.println(builder.toString()); }}Copy the code

Now you need to install LogEventDecoder and LogEventHandler into the ChannelPipeline. The following code shows how to do this from the LogEventMonitor main class

public class LogEventMonitor {

    private final EventLoopGroup group;
    private final Bootstrap bootstrap;

    public LogEventMonitor(InetSocketAddress address) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new ChannelInitializer<Channel>() {

                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new LogEventDecoder());
                        pipeline.addLast(new LogEventHandler());
                    }
                })
                .localAddress(address);
    }

    public Channel bind(a) {
        return bootstrap.bind().syncUninterruptibly().channel();
    }

    public void stop(a) {
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception {
        if(args.length ! =1) {
            throw new IllegalArgumentException("Usage: LogEventMonitor <port>");
        }
        LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0)));try {
            Channel channel = monitor.bind();
            channel.closeFuture().sync();
        }
        finally{ monitor.stop(); }}}Copy the code