There are various types of Internet of Things device terminals, and the encoding formats used by various manufacturers are different. Therefore, when accessing the Internet of Things platform, a unified data format is required to facilitate device management for applications on the platform.

EMQ X Enterprise Edition 3.4.0 provides Schema Registry functionality to provide codec capabilities. The Schema Registry manages the schemas used by codecs, processes encoding or decoding requests, and returns results. The Schema Registry works with the rule engine to adapt to device access and rule design in various scenarios.

The data format

The following figure shows an application case of the Schema Registry. Data reported by multiple devices in different formats is decoded by the Schema Registry, converted into a unified internal format, and then forwarded to background applications.

[Figure 1: Codec device data using Schema Registry]

Binary format support

The Schema Registry data formats built into EMQ X 3.4.0 include Avro and Protobuf. Avro and Protobuf are schema-dependent data formats. Encoded data is binary, and internal data formats decoded using Schema Registry (Map, described later) can be used directly by rules engines and other plug-ins. In addition, the Schema Registry supports user – defined (3rd-party) codec services, which can be implemented using HTTP or TCP callbacks to meet business requirements.

Architecture design

The Schema Registry maintains Schema text for built-in encodings such as Avro and Protobuf, but for custom 3rd-party encodings, the Schema text is maintained by the codec service itself if a Schema is required. The Schema Registry creates a Schema ID for each Schema, and the Schema API provides adding, querying, and deleting operations by Schema ID.

The Schema Registry can be both decoded and encoded. The Schema ID needs to be specified for encoding and decoding.

[Figure 2: Schema Registry Schema diagram]

Encoding invocation example: The parameter is Schema

schema_encode(SchemaID, Data) -> RawData
Copy the code

Examples of decoding calls:

schema_decode(SchemaID, RawData) -> Data
Copy the code

A common use case is to use a rules engine to invoke the encoding and decoding interfaces provided by the Schema Registry, and then use the encoded or decoded data as input for subsequent actions.

Codec + rule engine

The message processing level of EMQ X can be divided into three parts: Message routing (Messaging), Rule Engine and Data format Conversion (Data Conversion).

EMQ X’s PUB/SUB system routes messages to specified topics. The rules engine has the flexibility to configure business rules for data, match messages to rules, and then specify actions. Data format conversion occurs prior to the rule matching process by converting the data into a Map format that can participate in the rule matching process and then matching.

[Figure 3: Messaging, Rule Engine and Schema Registry]

Rules Engine Internal Data Format (Map)

The data format used internally by the rules engine is Erlang Map, so if the original data content is binary or otherwise, you must use codec functions (such as the schemA_decode and jSON_decode functions mentioned above) to convert it into a Map.

A Map is a key-value data structure in the form of #{Key => Value}. For example, user = #{id => 1, name => “Steve”} defines a user Map with ID 1 and name “Steve”.

SQL statements provide the “.” operator to extract and add Map fields in a nested way. Here is an example of a Map operation using an SQL statement:

SELECT user.id AS my_id
Copy the code

The filter result of the SQL statement is #{my_id => 1}.

JSON codec

The SQL statements of the rules engine provide encoding and decoding support for JSON-formatted strings. The SQL functions to convert JSON strings and Map formats are json_decode() and json_encode():

SELECT json_decode(payload) AS p FROM "message.publish" WHERE p.x = p.y, topic ~= "t/#"
Copy the code

{“x” = 1, “y” = 1}, and topic = t/a.

Json_decode (payload) as p decodes JSON strings into the following Map data structures so that fields in the Map can be used in the WHERE clause using p.x and p.y:

#{
  p => #{
    x => 1,
    y => 1}}Copy the code

Note: The AS clause is required to assign the decoded data to a Key before it can be manipulated.

Actual coding and decoding

Protobuf Example for data parsing

Rules requirements

The device publishes a binary message encoded using a Protobuf, which needs to be matched by the rules engine and republished to the topic associated with the “name” field. The format of the topic is “person/${name}”.

For example, republish a message with a “name” field of “Shawn” to the subject “Person /Shawn”.

Create Schema

Create a Protobuf Schema using the following parameters in the Dashboard of EMQ X:

  1. Name: protobuf_person

  2. Codec type: protobuf

  3. Schema: The following Protobuf Schema defines a Person message.

    message Person {
      required string name = 1;
      required int32 id = 2;
      optional string email = 3;
    }
    Copy the code

After the Schema is created, EMQX assigns a Schema ID and Version. If “protobuf_person” is created for the first time, the Schema ID is “protobuf_Person :1.0”.

Create rules

Write regular SQL statements using the Schema ID you just created:

SELECT
  schema_decode('protobuf_person: 1.0', payload, 'Person') as person, payload
FROM
  "message.publish"
WHERE
  topic =~ 't/#' and person.name = 'Shawn'
Copy the code

The key here is schema_decode(‘protobuf_person:1.0’, payload, ‘Person’):

  • schema_decodeThe payload function decodes the contents of the payload field according to the ‘protobuf_person:1.0’ Schema.
  • as personStore the decoded value in the variable “person”;
  • The last parameterPersonPayload specifies that the type of messages in the payload is the ‘Person’ type defined in the Protobuf schema.

Then add the action with the following parameters:

  • Action type: Message republish
  • Destination topic: person/${person.name}
  • Message content template: ${person}

This action sends the decoded “person” in JSON format to the person/${person.name} theme. Where ${person.name} is a variable placeholder that will be replaced at run time with the value of the “name” field in the message content.

Device side code

Once the rules are created, you can simulate the data for testing.

The following code uses Python to populate a Person message, encode it as binary data, and then send it to the “T /1” subject. See the complete code.

def publish_msg(client):
    p = person_pb2.Person()
    p.id = 1
    p.name = "Shawn"
    p.email = "[email protected]"
    message = p.SerializeToString()
    topic = "t/1"
    print("publish to topic: t/1, payload:", message)
    client.publish(topic, payload=message, qos=0, retain=False)
Copy the code

Check the rule execution result

  1. In Dashboard’s Websocket tool, log in to an MQTT Client and subscribe to “person/#”.

  2. Install python dependencies and execute device side code:

$ pip3 install protobuf
$ pip3 install paho-mqtt

$ python3 ./pb2_mqtt.py
Connected with result code 0
publish to topic: t/1, payload: b'\n\x05Shawn\x10\x01\x1a\[email protected]'
t/1 b'\n\x05Shawn\x10\x01\x1a\[email protected]'
Copy the code
  1. Check that the Websocket receives the subject asperson/ShawnThe message:
{"email":"[email protected]"."id": 1,"name":"Shawn"}
Copy the code

Examples of Avro data parsing

Rules requirements

The device publishes an Avro-encoded binary message that needs to be republished to the topic associated with the “name” field after being matched by the rules engine. The format of the theme is “avro_user/${name}”.

For example, republish a message with a “name” field of “Shawn” to the subject “avro_user/Shawn”.

Create Schema

In the Dashboard of EMQ X, create an Avro Schema using the following parameters:

  1. Name: avro_user

  2. Codec type: AVro

  3. Schema:

    {
     "type":"record"."fields":[
         {"name":"name"."type":"string"},
         {"name":"favorite_number"."type": ["int"."null"] {},"name":"favorite_color"."type": ["string"."null"]]}}Copy the code

After the Schema is created, EMQX assigns a Schema ID and Version. If “avro_user” is created for the first time, the Schema ID is “AVro_user :1.0”.

Create rules

Write regular SQL statements using the Schema ID you just created:

SELECT
  schema_decode('avro_user: 1.0', payload) as avro_user, payload
FROM
  "message.publish"
WHERE
  topic =~ 't/#' and avro_user.name = 'Shawn'
Copy the code

The key point here is schema_decode(‘avro_user:1.0’, payload):

  • schema_decodeThe payload function decodes the payload against the ‘avro_user:1.0’ Schema.
  • as avro_userSave the decoded value to the variable “avro_user”.

Then add the action with the following parameters:

  • Action type: Message republish
  • Avro_user /${avro_user.name}
  • Message content template: ${avro_user}

This action sends the decoded “user” in JSON format to the subject avro_user/${avro_user.name}. Where ${avro_user.name} is a variable placeholder that will be replaced at run time with the value of the “name” field in the message content.

Device side code

Once the rules are created, you can simulate the data for testing.

The following code populates a User message in Python and encodes it as binary data, then sends it to the “T /1” subject. See the complete code.

def publish_msg(client):
    datum_w = avro.io.DatumWriter(SCHEMA)
    buf = io.BytesIO()
    encoder = avro.io.BinaryEncoder(buf)
    datum_w.write({"name": "Shawn"."favorite_number": Awesome!."favorite_color": "red"}, encoder)
    message = buf.getvalue()
    topic = "t/1"
    print("publish to topic: t/1, payload:", message)
    client.publish(topic, payload=message, qos=0, retain=False)
Copy the code

Check the rule execution result

  1. In Dashboard’s Websocket tool, log in to an MQTT Client and subscribe to “avro_user/#”.

  2. Install python dependencies and execute device side code:

$ pip3 install protobuf
$ pip3 install paho-mqtt

$ python3 avro_mqtt.py
Connected with result code 0
publish to topic: t/1, payload: b'\nShawn\x00\xb4\n\x00\x06red'
Copy the code
  1. Check that the Websocket receives the subject asavro_user/ShawnThe message:
{"favorite_color":"red"."favorite_number": 666,"name":"Shawn"}
Copy the code

Custom codec examples

Rules requirements

The device issues an arbitrary message to verify that the self-deployed codec service is working properly.

Create Schema

Create a 3rd-party Schema using the following parameters on the Dashboard of EMQ X:

  1. Name: my_parser
  2. Codec type: 3rd-party
  3. The third-party type is HTTP
  4. URL: http://127.0.0.1:9003/parser
  5. Codec configuration: XOR

Keep the default Settings for other configurations. Emqx allocates a Schema ID “my_parser”. Custom codecs do not have Version management.

The fifth codec configuration above is an optional string that is relevant to the business of the codec service.

Create rules

Write regular SQL statements using the Schema ID you just created:

SELECT
  schema_encode('my_parser', payload) as encoded_data,
  schema_decode('my_parser', encoded_data) as decoded_data
FROM
  "message.publish"
WHERE
  topic =~ 't/#'
Copy the code

This SQL statement does Encode and then Decode the data to verify that the encoding and decoding process is correct:

  • schema_encodeThe payload function encodes the content of the payload field according to the Schema ‘my_parser’ and stores the result to theencoded_dataIn this variable;
  • schema_decodeThe payload function decodes the payload field according to the ‘my_parser’ Schema and stores the result to thedecoded_dataIn this variable;

The final filter result of this SQL statement is the encoded_data and decoded_data variables.

Then add the action with the following parameters:

  • Action type: Check (debug)

This check action prints the results of the SQL filter to the EMQX console (Erlang shell).

If the service is started using the EMQX Console, the print is displayed directly in the console. If the service is started with emqx start, the print is printed to the erlang.log.N file in the log directory, where the “N” is an integer, such as “erlang.log.1”, “erlang.log.2”.

Codec server code

Once the rules are created, you can simulate the data for testing. So first you need to write your own codec service.

The following code implements an HTTP codec service in Python. For simplicity, the service provides two simple methods for codec (encryption and decryption). See the complete code:

  • The bitwise exclusive or
  • Characters to replace
def xor(data):
  """ >>> xor(xor(b'abc')) b'abc' >>> xor(xor(b'! }~*')) b'! } ~ * '" ""
  length = len(data)
  bdata = bytearray(data)
  bsecret = bytearray(secret * length)
  result = bytearray(length)
  for i in range(length):
    result[i] = bdata[i] ^ bsecret[i]
  return bytes(result)

def subst(dtype, data, n):
  """ >>> subst('decode', b'abc', 3) b'def' >>> subst('decode', b'ab~', 1) b'bc! ' >>> subst('encode', b'def', 3) b'abc' >>> subst('encode', b'bc! ', 1) b'ab~' """
  adata = array.array('B', data)
  for i in range(len(adata)):
    if dtype == 'decode':
      adata[i] = shift(adata[i], n)
    elif dtype == 'encode':
      adata[i] = shift(adata[i], -n)
  return bytes(adata)

Copy the code

Run the service:

$ pip3 install flask
$ python3 http_parser_server.py
 * Serving Flask app "http_parser_server" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
 * Running on http://127.0.0.1:9003/ (Press CTRL+C to quit)

Copy the code

Check the rule execution result

Because this example is relatively simple, we use the MQTT Websocket client directly to simulate sending a message from the device side.

  1. In Dashboard’s Websocket tool, log in to an MQTT Client and post a message to “t/1” that says “Hello”.

  2. Check the print in emQX console (Erlang shell) :

([email protected]) > 1 / inspect Selected Data:#{decoded_data => <<"hello">>,9,4,13,13,14 encoded_data = > < < > >} Envs:#{event => 'message.publish',
                flags => #{dup => false,retain => false},
                from => <<"mqttjs_76e5a35b">>,
                headers =>
                    #{allow_publish => true,,0,0,1 peername = > {{127}, 54753}, the username = > < < > >}, id = > < < 0,5,146,30,146,38,123,81,244,66,0,0,62,117,0,1 > >, node = >'[email protected]',payload => <<"hello">>,qos => 0,
                timestamp => {1568,34882,222929},
                topic => <<"t/1">>}
        Action Init Params: # {}

Copy the code

Select Data is the Data filtered by the SQL statement, Envs is the environment variable available within the rules engine, and Action Init Params is the initialization parameter of the Action. The three data are in Map format.

Decoded_data and encoDED_data correspond to AS in SELECT statement. Because decoded_data is encoded and decoded again, it is restored to the content we sent “hello”, indicating that the codec plug-in is working properly.


For more information, please visit our official website emqx. IO, or follow our open source project github.com/emqx/emqx. For more details, please visit our official documentation.