In our work, we need to use Kafka Connect to transfer json data from Kafka to S3 and save it as Parquet. So let’s record the Demo here.

S3 Parquet Sink is not supported in Kafka Connect earlier versions. We used Confluent 5.5.5.

In another note (juejin.cn/post/699841… Parquet needs to be serialized using Avro. In that note, we used producer/ Consumer to send Avro data directly, which allows us to save Kafka data directly as a Parquet file.

However, in our use, the data is originally saved as Json file, so the producer uses JsonConverter. Since we cannot modify the producer, we need to convert JsonConverter into AvroConverter.

There are two ways to deal with it:

  • Using external processing, a consumer consumes Json data, which is then converted into Avro data and saved to a new topic. This can be implemented using Ksql or by writing a Streams Job
  • Customize Converter in Kafka Connect to convert Json data to Avro serialized data that can be saved as Parquet.

Convert Json to Avro using Ksql

Our data structure in Kafka:

{
    "id":"string"."name":"string"."age":"int"
}
Copy the code

Create a stream table based on the data source in Ksql:

create stream json_table (id varchar, name varchar,age int) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');
Copy the code

Create stream table Avro data in Ksql:

CREATE STREAM avro_table WITH (KAFKA_TOPIC='test_topic_avro',REPLICAS=2,PARTITIONS=8,VALUE_FORMAT='AVRO') AS SELECT * FROM json_table;
Copy the code

Some SQL used:

-- View all streams tables
show streams;
print 'test_topic_avro';
Stop the stream job
TERMINATE CSAS_AVRO_TABLE_0;
-- Delete avro Stream
drop stream AVRO_TABLE;
Copy the code

This allows Avro topic data to be consumed in the final Kafka Connector:

{
  "name": "parquet_sink_test"."config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector"."errors.log.include.messages": "true"."s3.region": "region"."topics.dir": "folder"."flush.size": "300"."tasks.max": "1"."timezone": "UTC"."s3.part.size": "5242880"."enhanced.avro.schema.support": "true"."rotate.interval.ms": "6000"."locale": "US"."format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat"."s3.part.retries": "18"."value.converter": "io.confluent.connect.avro.AvroConverter"."errors.log.enable": "true"."s3.bucket.name": "bucket"."partition.duration.ms": "3600000"."topics": "test_topic_avro"."batch.max.rows": "100"."partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner"."value.converter.schemas.enable": "true"."name": "parquet_sink_test"."storage.class": "io.confluent.connect.s3.storage.S3Storage"."rotate.schedule.interval.ms": "6000"."value.converter.schema.registry.url": "http://schema-registry:8081"."schema.registry.url": "http://schema-registry:8081"."path.format": "'log_year'=YYYY/'log_month'=MM/'log_day'=dd/'log_hour'=HH"}}Copy the code

Custom Converter convert Json to Avro

Tips: We use ByteArraySerializer for sending Json data. To customize Converter, you need to implement the Converter interface:

public interface Converter {
    void configure(Map<String, ? > configs,boolean isKey);
    // It is used to convert third-party data to Kafka Object
    byte[] fromConnectData(String topic, Schema schema, Object value);
    default byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) {
        return fromConnectData(topic, schema, value);
    }
    //// is used to convert Kafka Object to third-party data
    SchemaAndValue toConnectData(String topic, byte[] value);
    default SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
        returntoConnectData(topic, value); }}Copy the code

We can take a look at the official AvroConverter fromConnectData and toConnectData:

  • fromConnectData
public byte[] fromConnectData(String topic, Schema schema, Object value) {
  try {
    org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(schema);
    return serializer.serialize(
        topic,
        isKey,
        avroData.fromConnectData(schema, avroSchema, value),
        new AvroSchema(avroSchema));
  } catch (SerializationException e) {
    throw new DataException(
        String.format("Failed to serialize Avro data from topic %s :", topic),
        e
    );
  } catch (InvalidConfigurationException e) {
    throw new ConfigException(
        String.format("Failed to access Avro data from topic %s : %s", topic, e.getMessage()) ); }}Copy the code
  • toConnectData
public SchemaAndValue toConnectData(String topic, byte[] value) {
  try {
    GenericContainerWithVersion containerWithVersion =
        deserializer.deserialize(topic, isKey, value);
    if (containerWithVersion == null) {
      return SchemaAndValue.NULL;
    }
    GenericContainer deserialized = containerWithVersion.container();
    Integer version = containerWithVersion.version();
    if (deserialized instanceof IndexedRecord) {
      return avroData.toConnectData(deserialized.getSchema(), deserialized, version);
    } else if (deserialized instanceof NonRecordContainer) {
      return avroData.toConnectData(
          deserialized.getSchema(), ((NonRecordContainer) deserialized).getValue(), version);
    }
    throw new DataException(
        String.format("Unsupported type returned during deserialization of topic %s ", topic)
    );
  } catch (SerializationException e) {
    throw new DataException(
        String.format("Failed to deserialize data for topic %s to Avro: ", topic),
        e
    );
  } catch (InvalidConfigurationException e) {
    throw new ConfigException(
        String.format("Failed to access Avro data from topic %s : %s", topic, e.getMessage()) ); }}Copy the code

The toConnectData method is used because kafka data is transferred to S3 using connect. In this method, the parameter value takes the json byte array. Since subsequent processing can only accept AVRO serialized byte array, we start the method by AVRO serialized JSON data and converting it to data that Kafka AVRO can recognize.

Instead, you can move to the fromConnectData method, which converts third-party data into something that Kafka Avro can recognize. All we need to do is call this method.

org.apache.kafka.connect.data.Schema schema = SchemaBuilder.struct().name("TEST_name")
        .field("id", org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA)
        .field("name", org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA)
        .field("age", org.apache.kafka.connect.data.Schema.OPTIONAL_INT32_SCHEMA)
        ;

  ObjectMapper objectMapper = new ObjectMapper();
  User user = objectMapper.readValue(value, TelenavEventKafka.class);
  Struct struct = new Struct(schema)
        .put("id", event.getId())
        .put("name", event.getName())
        .put("age", event.getAge()));

  org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(schema);

  byte[] serialize = serializer.serialize(
          topic,
          false,
          avroData.fromConnectData(schema, struct),
          new AvroSchema(avroSchema));
          
// Here is the official code for AvroConverter
Copy the code

The Connect configuration:

{
  "name": "parquet_sink_test"."config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector"."errors.log.include.messages": "true"."s3.region": "region"."topics.dir": "folder"."flush.size": "300"."tasks.max": "1"."timezone": "UTC"."s3.part.size": "5242880"."enhanced.avro.schema.support": "true"."rotate.interval.ms": "6000"."locale": "US"."format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat"."s3.part.retries": "18"."value.converter": "MyCustomAvroConverter"."errors.log.enable": "true"."s3.bucket.name": "bucket"."partition.duration.ms": "3600000"."topics": "test_topic_avro"."batch.max.rows": "100"."partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner"."value.converter.schemas.enable": "true"."name": "parquet_sink_test"."storage.class": "io.confluent.connect.s3.storage.S3Storage"."rotate.schedule.interval.ms": "6000"."value.converter.schema.registry.url": "http://schema-registry:8081"."schema.registry.url": "http://schema-registry:8081"."path.format": "'log_year'=YYYY/'log_month'=MM/'log_day'=dd/'log_hour'=HH"}}Copy the code