Starting with v4.1, the EMQ X MQTT server provides a dedicated multilingual support plug-in emqX_extension_hook that now supports the use of other programming languages to handle hook events in EMQ X, Developers can use Python or Java to quickly develop their own plug-ins, which can be extended based on official functions to meet their own business scenarios. Such as:

  • Verify the login permission of a client: When the client is connected, the corresponding function is triggered. After obtaining the client information based on the parameters, the client can read the database and compare the parameters to determine whether the client has the login permission
  • Record the client online status and online and offline history: When the client status changes, the corresponding function is triggered to obtain the client information and rewrite the client online status in the database
  • Verify the operation permission of PUB/SUB of a client: When publishing or subscribing, the corresponding function is triggered to obtain the client information and the current topic and determine whether the client has the corresponding operation permission
  • Process Sessions and Message events, realize subscription relationship and Message processing/storage: Trigger corresponding functions when messages are published and status changes, obtain the current client information, Message status and Message content, and forward them to Kafka or database for storage.

Note: Message class hooks are supported only in the Enterprise edition.

Python and Java drivers are based on Erlang/ OTP-port interprocess communication, which has very high throughput performance. This paper takes Java extension as an example to introduce the use of EMQ X cross-language extension.

Java extension usage examples

requirements

  • JDK 1.8 or later must be installed on the EMQ X server

Begin to use

  1. Creating a Java project
  2. Download the IO. Emqx.extension. jar and erlport.jar files
  3. Add the SDKio.emqx.extension.jaranderlport.jarTo project dependencies
  4. copyexamples/SampleHandler.javaInto your project
  5. According to the SDKSampleHandler.javaWrite business code to ensure successful compilation

The deployment of

After compiling all the source code, you need to deploy the SDK and code files into EMQ X:

  1. copyio.emqx.extension.jaremqx/data/extensiondirectory
  2. Will be compiled.classFiles, for exampleSampleHandler.classCopied to theemqx/data/extensiondirectory
  3. Modify theemqx/etc/plugins/emqx_extension_hook.confConfiguration file:
exhook.drivers = java
## Search path for scripts or library
exhook.drivers.java.path = data/extension/
exhook.drivers.java.init_module = SampleHandler
Copy the code

Start the emqx_extension_hook plug-in and it will not start properly if the configuration is wrong or the Java code is written incorrectly. After startup, try to establish an MQTT connection and observe the business.

The sample

Here is SampleHandler. Java sample program, the program the DefaultCommunicationHandler class inherits from the SDK. This sample code demonstrates how to mount all the hooks in an EMQ X system:

import emqx.extension.java.handler.*;
import emqx.extension.java.handler.codec.*;
import emqx.extension.java.handler.ActionOptionConfig.Keys;

public class SampleHandler extends DefaultCommunicationHandler {
    
    @Override
    public ActionOptionConfig getActionOption(a) {
        ActionOptionConfig option = new ActionOptionConfig();
        option.set(Keys.MESSAGE_PUBLISH_TOPICS, "#");
        option.set(Keys.MESSAGE_DELIVERED_TOPICS, "#");
        option.set(Keys.MESSAGE_ACKED_TOPICS, "#");
        option.set(Keys.MESSAGE_DROPPED_TOPICS, "#");
        
        return option;
    }
    
    // Clients
    @Override
    public void onClientConnect(ConnInfo connInfo, Property[] props) {
        System.err.printf("[Java] onClientConnect: connInfo: %s, props: %s\n", connInfo, props);
    }

    @Override
    public void onClientConnack(ConnInfo connInfo, ReturnCode rc, Property[] props) {
        System.err.printf("[Java] onClientConnack: connInfo: %s, rc: %s, props: %s\n", connInfo, rc, props);
    }

    @Override
    public void onClientConnected(ClientInfo clientInfo) {
        System.err.printf("[Java] onClientConnected: clientinfo: %s\n", clientInfo);
    }

    @Override
    public void onClientDisconnected(ClientInfo clientInfo, Reason reason) {
        System.err.printf("[Java] onClientDisconnected: clientinfo: %s, reason: %s\n", clientInfo, reason);
    }

    // Determine the authentication result, return true or false
    @Override
    public boolean onClientAuthenticate(ClientInfo clientInfo, boolean authresult) {
        System.err.printf("[Java] onClientAuthenticate: clientinfo: %s, authresult: %s\n", clientInfo, authresult);

        return true;
    }

    // Determine the result of ACL check, return true or false
    @Override
    public boolean onClientCheckAcl(ClientInfo clientInfo, PubSub pubsub, Topic topic, boolean result) {
        System.err.printf("[Java] onClientCheckAcl: clientinfo: %s, pubsub: %s, topic: %s, result: %s\n", clientInfo, pubsub, topic, result);

        return true;
    }

    @Override
    public void onClientSubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) {
        System.err.printf("[Java] onClientSubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props);
    }

    @Override
    public void onClientUnsubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) {
        System.err.printf("[Java] onClientUnsubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props);
    }

    // Sessions
    @Override
    public void onSessionCreated(ClientInfo clientInfo) {
        System.err.printf("[Java] onSessionCreated: clientinfo: %s\n", clientInfo);
    }

    @Override
    public void onSessionSubscribed(ClientInfo clientInfo, Topic topic, SubscribeOption opts) {
        System.err.printf("[Java] onSessionSubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic);
    }

    @Override
    public void onSessionUnsubscribed(ClientInfo clientInfo, Topic topic) {
        System.err.printf("[Java] onSessionUnsubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic);
    }

    @Override
    public void onSessionResumed(ClientInfo clientInfo) {
        System.err.printf("[Java] onSessionResumed: clientinfo: %s\n", clientInfo);
    }

    @Override
    public void onSessionDiscarded(ClientInfo clientInfo) {
        System.err.printf("[Java] onSessionDiscarded: clientinfo: %s\n", clientInfo);
    }
    
    @Override
    public void onSessionTakeovered(ClientInfo clientInfo) {
        System.err.printf("[Java] onSessionTakeovered: clientinfo: %s\n", clientInfo);
    }

    @Override
    public void onSessionTerminated(ClientInfo clientInfo, Reason reason) {
        System.err.printf("[Java] onSessionTerminated: clientinfo: %s, reason: %s\n", clientInfo, reason);
    }

    // Messages
    @Override
    public Message onMessagePublish(Message message) {
        System.err.printf("[Java] onMessagePublish: message: %s\n", message);
        
        return message;
    }

    @Override
    public void onMessageDropped(Message message, Reason reason) {
        System.err.printf("[Java] onMessageDropped: message: %s, reason: %s\n", message, reason);
    }

    @Override
    public void onMessageDelivered(ClientInfo clientInfo, Message message) {
        System.err.printf("[Java] onMessageDelivered: clientinfo: %s, message: %s\n", clientInfo, message);
    }

    @Override
    public void onMessageAcked(ClientInfo clientInfo, Message message) {
        System.err.printf("[Java] onMessageAcked: clientinfo: %s, message: %s\n", clientInfo, message); }}Copy the code

SampleHandler consists of two main parts:

  1. The getActionOption method is overridden. This method configures message-related hooks, specifying a list of topics that need to take effect.

    Configuration items Corresponding to the hook
    MESSAGE_PUBLISH_TOPICS message_publish
    MESSAGE_DELIVERED_TOPICS message_delivered
    MESSAGE_ACKED_TOPICS message_acked
    MESSAGE_DROPPED_TOPICS message_dropped
  2. Overloading the on

    methods, which are the callbacks that actually handle hook events. These methods are named by prefixing each hookName variant with on, or by using CamelCase with the underscore removed from the hookName, for example, The corresponding function of the hook client_connect is called onClientConnect. Events generated by the EMQ X client, such as connect, publish, subscribe, etc., are eventually distributed to the hook event callback functions, which can then operate on the properties and states. Only the parameters are printed out in the sample program. If you only care about part of the hook event, you only need to overload the callback function for that part of the hook event. You do not need to overload all of the callback functions.

The timing of each callback function and the list of supported Hooks are exactly the same as those built into EMQ X, see: reset-emq X

In achieving their own extensions, the simplest way is also inherited DefaultCommunicationHandler parent class, the class of binding of the hook and the callback function encapsulation, and further encapsulates the callback function of parameters involved in data structures, used for quick learning.

Development of advanced

If the Java extensions of controllable more demanding, DefaultCommunicationHandler class have been unable to meet demand, can be realized through CommunicationHandler interface, from the underlying code logic control, more write more flexible extension program.

package emqx.extension.java.handler;

public interface CommunicationHandler {
    
    public Object init();
    
    public void deinit();
}
Copy the code
  • init()Method: Used for initialization, declaring which hooks the extension needs to mount, and the mounted configuration
  • deinit()Method: Used for logging out.

For details on the data format, see the design document.

Copyright: EMQ

Original link: www.emqx.io/cn/blog/dev…