Kafka Connector is part of Apache Kafka® and provides streaming integration between data stores and Kafka. For data engineers, the JSON format configuration file is all you need. There are already many connectors for data stores, including JDBC, Elasticsearch, IBM MQ, S3, and BigQuery, to name a few.

For developers, Kafka connector has a rich API, and you can develop your own connector if necessary. It also has a REST API for configuring and managing connectors.

The Kafka connector itself is modular and provides a very powerful way to meet integration requirements. Some of the key components include:

  • Connector: Defines a set of JAR files that integrate with the data store;
  • Converter: handles serialization and deserialization of data;
  • Transform: Message processing during transmission (optional).

One of the common mistakes or misconceptions surrounding the Kafka connector is the serialization of data, which the Kafka connector handles through a converter. Here’s how they work and how some common problems are handled.

Kafka messages are just bytes

Kafka messages are organized by topic. Each message is a key/value pair, but that’s what Kafka needs. When data is stored in Kafka, it is only bytes, which makes Kafka suitable for a variety of scenarios, but it also means that it is the developer’s responsibility to decide how to serialize the data.

One of the keys to the standard steps when configuring the Kafka connector is the serialization format. Make sure that the readers and writers of the topic use the same serialization format, otherwise confusion and errors can occur!

There are many common formats, including:

  • JSON;
  • Avro;
  • Protobuf;
  • String splitting (for example, CSV).

Each format has advantages and disadvantages.

Selection of serialization format

Some principles for choosing a serialization format include:

  • Patterns: Many times data has a pattern. You may not like this fact, but it is your responsibility as a developer to preserve and propagate this pattern because it provides a contract between services. Some message formats (such as Avro and Protobuf) have strong schema support, while others have little (JSON) or no support at all (delimited strings);
  • Ecosystem compatibility: Avro is a first class citizen of the Confluent platform, with native support for Confluent schema registry, Kafka Connector, KSQL, and more. Protobuf relies on community contributions supported by some features;
  • Message size: JSON is in plain text format and message size depends on the compression configuration of Kafka itself, whereas Avro and Protobuf are binary formats and therefore messages are smaller.
  • Language support: The Java architecture has strong support for Avro, but applications that are not Java-based may find it difficult to work with.

If you use JSON format to write to the target, do you need to use JSON format in the topic?

No, the format in which the data is read from the source or written to external storage does not affect the format in which messages are serialized in Kafka.

Connectors in Kafka are responsible for reading data from a source (such as a database) and passing it to the converter as an internal representation of the data. The converter in Kafka then serializes this source data object to a topic.

When using the Kafka connector as the receiver, the reverse is true, that is, the converter deserializes the data from the topic into an internal representation, which is passed to the connector and then written to the target using the specified method.

This means that you can save data in topics such as Avro format, and then specify the format to be used by the receiver connector when writing it to HDFS, for example.

Configuration converter

The Kafka connector uses the default converter configuration at the work node level, or it can be overridden on each connector. Because it is usually a good practice to use the same serialization format across the pipeline, you usually only need to configure the converter on the working node rather than specify it in the connector. But if you extract data from other topics that use different serialization formats, you specify it in the connector configuration, and even if you override it in the connector configuration, the same converter performs the task.

The correct connector never serializes/deserializes messages stored in Kafka, but lets the configured converter do the job.

Note that Kafka messages are just key/value byte pairs, so you need to use the key.converter and value.converter configuration items to specify converters for the key and value. In some cases, you can specify different converters for the key and value.

Here is an example of using a String converter. Since it is just a String and the data has no schema, it is not that useful for value:

"key.converter": "org.apache.kafka.connect.storage.StringConverter",
Copy the code

Some converters have other configuration items, such as specifying the schema registry for Avro and whether you want the Kafka connector to embed the schema in the JSON itself. When specifying a configuration item for a converter, use key.converter or value.converter. Prefix. For example, to use Avro for the content of a message, you need to specify the following configuration items:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
Copy the code

Common converters include:

  • Avro: Part of the Confluent platform
io.confluent.connect.avro.AvroConverter
Copy the code
  • String: part of Apache Kafka
org.apache.kafka.connect.storage.StringConverter
Copy the code
  • JSON: Part of Apache Kafka
org.apache.kafka.connect.json.JsonConverter
Copy the code
  • ByteArray: part of Apache Kafka
org.apache.kafka.connect.converters.ByteArrayConverter
Copy the code
  • Protobuf: Community open source
com.blueapron.connect.protobuf.ProtobufConverter
Copy the code

JSON and pattern

While JSON does not support carrying schemas by default, the Kafka connector does support specific JSON formats for embedding schemas. Because the schema is also included in each message, the size of the generated data may be larger.

If you are configuring the Kafka source connector and you want the Kafka connector to include patterns in messages written to Kafka, you need to do the following:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
Copy the code

The final message written to Kafka is as follows: schema and payload are the top-level elements of JSON:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "registertime"
      },
      {
        "type": "string",
        "optional": false,
        "field": "userid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "regionid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "gender"
      }
    ],
    "optional": false,
    "name": "ksql.users"
  },
  "payload": {
    "registertime": 1493819497170,
    "userid": "User_1",
    "regionid": "Region_5",
    "gender": "MALE"
  }
}
Copy the code

Notice the size of the message, as well as the size of the message composed of content and schema. Considering that this is repeated in every message, you can see why a format like Avro makes sense, since schemas are stored separately and messages contain only valid (and compressed) content.

If you use the Kafka receive Connector to consume JSON-formatted data from a Kafka topic, you need to know if the data contains schemas, and if so, the same format as above, not some arbitrary format. The configuration is as follows:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
Copy the code

However, if the JSON data used does not have a schema/payload structure, it looks like this:

{
  "registertime": 1489869013625,
  "userid": "User_1",
  "regionid": "Region_2",
  "gender": "OTHER"
}
Copy the code

The Kafka connector must be notified not to look for patterns through a configuration as follows:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
Copy the code

As before, remember that the converter configuration item (here schemschema.enable) requires the appropriate prefix key.converter or value.Converter.

Common mistakes

If the converter is configured incorrectly in the Kafka connector, you may experience the following common errors. These messages appear at the receiving end of the Kafka connector configuration because this is the point at which messages stored in Kafka are deserialized. Converter problems usually do not occur on the source side because serialization is already configured on the source side. Each of these causes the connector to fail, starting with the following error:

ERROR WorkerSinkTask{id=sink-file-users-json-noschema-01-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator. execAndHandleError(RetryWithToleranceOperator.java:178)
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute (RetryWithToleranceOperator.java:104)
Copy the code

After this error, you see an exception stack that describes the cause of the error. Note that the above errors are thrown for any serious errors in the connector, so you may see errors unrelated to serialization. To quickly locate the error caused by the incorrect configuration, refer to the following table:

Problem: Reading non-JSON data using JsonConverter

If you have non-JSON data on the source topic, but use JsonConverter to read it, you’ll see:

org.apache.kafka.connect.errors.DataException: Converting Byte [] to Kafka Connect data failed due to serialization error:... org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7)Copy the code

This may be caused by source-side topics being serialized in Avro or some other format.

Solution: If the data is actually in Avro format, you need to modify the Kafka connector receiver as follows:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
Copy the code

Alternatively, if the topic is injected by the Kafka connector, you can adjust the upstream source to output JSON data:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
Copy the code

Problem: Reading data in non-AVRO format using AvroConverter

This is the most common error that occurs when trying to read data using AvroConverter from topics that are not in Avro format, including data written using Avro serializers that are not in the Confluent mode registry:

org.apache.kafka.connect.errors.DataException: My topic - name at IO. Confluent. Connect the avro. AvroConverter. ToConnectData (AvroConverter. Java: 97)... org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 org.apache.kafka.common.errors.SerializationException: Unknown magic byte!Copy the code

Solution: Check the serialization format of the source topic, adjust the Kafka connector receiver to use the correct converter, or change the upstream format to Avro (best). If the upstream topic is injected by the Kafka connector, you can also configure the converter for the source connector as follows:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
Copy the code

Problem: Reading JSON data without the desired schema/payload structure

As mentioned earlier, the Kafka connector supports a special JSON-formatted message structure that contains valid content and schemas. If you read JSON data that does not have such a structure, you will get the following error:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
Copy the code

Remember that the only valid JSON structure for schemschema.enable =true is the schema and payload as the top-level elements (as shown above).

As the error message itself states, if it is simple JSON data, you should change the configuration of your connector to:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
Copy the code

If you want to include patterns in your data, either switch to using Avro (recommended) or configure upstream Kafka connectors to include patterns in messages:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
Copy the code

Tips for solving problems

View logs of the connector working node

To view the Kafka connector error log, you need to locate the output of the Kafka connector work node. There are several ways to install the Kafka connector, including Docker, Confluent CLI, Systemd, and manually downloaded compressed packages. Then the working node log is located in:

  • Docker:docker logs container_name;
  • Confluent CLI:confluent log connect;
  • Systemd: Writes log files/var/log/confluent/kafka-connect;
  • Other: By default, the Kafka connector sends its output tostdout, so you can see it in the terminal session that starts the Kafka connector.

View the configuration file for the Kafka connector

To change the configuration items of the Kafka connector working node (for all running connectors), the following changes need to be made accordingly:

  • Docker: Configure environment variables, such as in Docker Compose:
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
Copy the code
  • Confluent CLI: Use the configuration file/etc/schema-registry/connect-avro-distributed.properties;
  • Systemd (deb/ RPM) : Uses configuration files/etc/kafka/connect-distributed.properties;
  • Other: When starting the Kafka connector, you can specify the properties file of the working node, for example:
$CD confluent-5.0.0 $./bin/connect-distributed./etc/kafka/connect-distributedCopy the code

Check the Kafka theme

Suppose you encounter the error mentioned above and want to solve why the receiver of the Kafka connector cannot read data from the topic.

You need to check the data for the topic being read and verify that it is in the desired serialization format. Also, remember that all messages must be in this format, so don’t just assume that because the message is now being sent to the topic in the correct format, there won’t be a problem. The Kafka connector and other consumers also read existing messages for this topic.

The following uses the command line to describe troubleshooting, but there are other tools that do the same:

  • The Confluent Control Center has the ability to view topic content in a visual way, including automatically determining serialization format;
  • KSQL’s PRINT command prints the contents of the topic to the console, including automatically determining the serialization format;
  • Confluent CLITool hasconsumeCommand, which can be used to read data in string and Avro format.

If it is considered as string /JSON data

Console tools are available, including kafkacat and kafka-console-Consumer. In kafkacat for example:

$ kafkacat -b localhost:9092 -t users-json-noschema -C -c1
{
  "registertime":1493356576434,"userid":"User_8","regionid":"Region_2","gender":"MALE"}
Copy the code

Using the jq command, you can also validate and format JSON:

$ kafkacat -b localhost:9092 -t users-json-noschema -C -c1|jq '.'
{
  "registertime": 1493356576434,
  "userid": "User_8",
  "regionid": "Region_2",
  "gender": "MALE"
}
Copy the code

If you see garbled characters like the following, it’s probably binary data, such as Avro or Protobuf writing:

$kafkacat -b localhost:9092 -t users- avro-c-c1 ڝ�� VUser_9Region_MALECopy the code

If it is Avro format data

You need to use a console tool designed for reading and deserializing Avro data, in this case kafka-Avro-console-consumer. Make sure you specify the correct schema registry URL:

$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \  --topic users-avro \ --from-beginning --max-messages 1 {"registertime":1505213905022,"userid":"User_5","regionid":"Region_4","gender":"FEMALE"}Copy the code

As before, if you want to format it, you can pipe the result to JQ:

$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \  --topic users-avro \ --from-beginning --max-messages 1 | \ jq '.' { "registertime": 1505213905022, "userid": "User_5", "regionid": "Region_4", "gender": "FEMALE" }Copy the code

Internal converter

When running in distributed mode, the Kafka connector uses Kafka itself to store metadata about its operations, including connector configurations, offsets, and so on.

Through internal. Key. The converter/internal value. The converter configuration items, these Kafka subject itself can be configured to use a different converters. However, these configuration items are for internal use only and have actually been deprecated since Kafka 2.0. You no longer need to change these, and if you change these configuration items, starting with Kafka 2.0, you will receive a warning.

Apply a pattern to a message without a pattern

Many times the Kafka connector will import data from where a schema already exists, so as long as the schema is retained and the appropriate serialization format (such as Avro) is used, along with compatibility guarantees such as schema registries, all downstream users of the data can benefit from the available schema. But what if there’s no clear pattern?

You may be reading data from plain text files using FileSourceConnector (not recommended for production, but commonly used for PoC), or you may be extracting data from REST endpoints using REST connectors. Since there are no schemas for these and others, they need to be declared.

Sometimes you might just want to read the bytes from the source and write them to a topic, but most of the time you need to do the right thing and apply the schema so that the data can be processed correctly. It is better to process the problem once as part of the data extraction, rather than pushing it to every consumer (possibly multiple).

You can write your own Kafka streaming application to apply patterns to data in Kafka topics, but you can also use KSQL. This article shows how to do this with JSON data extracted from a REST endpoint. I’ll look at a simple example of applying the pattern to some CSV data, which is clearly possible.

Suppose you have a Kafka theme named testData-CSV with some CSV data that looks like this:

$ kafkacat -b localhost:9092 -t testdata-csv -C
1,Rick Astley,Never Gonna Give You Up
2,Johnny Cash,Ring of Fire
Copy the code

From observation, you can guess that it has three fields, which might be:

  • ID;
  • The artist;
  • Songs.

If you keep data in such a topic, then any application that wants to use that data (be it a Kafka connector receiver, a custom Kafka application, or something else) needs to guess this pattern every time. Or worse, every consumer application developer needs to constantly confirm the schema and any changes to the data provider. As with Kafka decoupling systems, this pattern dependency forces hard coupling between teams, which is not a good thing.

So all you need to do is use KSQL to apply the schema to the data and populate a new derived topic where the schema is held. In KSQL, you can view the topic data as follows:

ksql> PRINT 'testdata-csv' FROM BEGINNING;
Format:STRING
11/6/18 2:41:23 PM UTC , NULL , 1,Rick Astley,Never Gonna Give You Up
11/6/18 2:41:23 PM UTC , NULL , 2,Johnny Cash,Ring of Fire
Copy the code

The first two fields here (11/6/18 2:41:23 PM UTC and NULL) are the timestamp and key of the Kafka message, respectively, while the remaining fields are from the CSV file. Let’s register the topic with KSQL and declare the schema:

ksql> CREATE STREAM TESTDATA_CSV (ID INT, ARTIST VARCHAR, SONG VARCHAR) \
WITH (KAFKA_TOPIC='testdata-csv', VALUE_FORMAT='DELIMITED');

Message
----------------
Stream created
----------------
Copy the code

KSQL now has a data flow schema:

ksql> DESCRIBE TESTDATA_CSV;

Name                 : TESTDATA_CSV
 Field   | Type
-------------------------------------
 ROWTIME | BIGINT (system)
 ROWKEY  | VARCHAR(STRING) (system)
 ID      | INTEGER
 ARTIST  | VARCHAR(STRING)
 SONG    | VARCHAR(STRING)
-------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
Copy the code

Verify that the data meets expectations by querying the KSQL flow. Note that for existing Kafka themes, you are only a consumer of Kafka at this time and have not changed or copied any data.

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> SELECT ID, ARTIST, SONG FROM TESTDATA_CSV;
1 | Rick Astley | Never Gonna Give You Up
2 | Johnny Cash | Ring of Fire
Copy the code

Finally, a new Kafka theme is created, populated with reserialized data with schemas. KSQL queries are continuous, so in addition to sending any existing data from the source-side topic to the target-side topic, KSQL will also send any future data to the topic.

ksql> CREATE STREAM TESTDATA WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM TESTDATA_CSV;

Message
----------------------------
Stream created and running
----------------------------
Copy the code

The data is then validated using a console consumer in Avro format:

$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \
                                --property schema.registry.url=http://localhost:8081 \
                                --topic TESTDATA \
                                --from-beginning | \
                                jq '.'
{
  "ID": {
    "int": 1
},
  "ARTIST": {
    "string": "Rick Astley"
},
  "SONG": {
    "string": "Never Gonna Give You Up"
  }
}
[…]
Copy the code

You can even view registered schemas in the schema registry:

$ curl -s http://localhost:8081/subjects/TESTDATA-value/versions/latest|jq '.schema|fromjson'
{
  "type": "record",
  "name": "KsqlDataSourceSchema",
  "namespace": "io.confluent.ksql.avro_schemas",
  "fields": [
    {
      "name": "ID",
      "type": [
        "null",
        "int"
      ],
      "default": null
    },
    {
      "name": "ARTIST",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "SONG",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}
Copy the code

Any new messages written to the original topic (Testdata-CSV) are automatically processed by KSQL and written to a new topic named TestData in Avro format. Now, any application or team that wants to use this data can simply process the TESTDATA topic and serialize the data with Avro that declares the schema. You can also use this technique to change the number of partitions, partition keys, and replicators in a topic.

conclusion

The Kafka connector is a very simple but powerful tool for integrating other systems with Kafka. The most common misconception is the converter provided by the Kafka connector. As we’ve seen before that Kafka messages are just key/value pairs, it’s important to know which serialization mechanism to use and then standardize it in the Kafka connector.